【Hazelcast】PubSub 发布/订阅
程序员文章站
2022-05-21 13:42:30
...
① 建立股票价格类
package com.cqh.Hazelcast_PubSub;
import java.io.Serializable;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Created by yl1794 on 2018/4/27.
*/
//要求将股票的买入卖出价格在任何时间发布给做市商
public class StockPrice implements Serializable{
private static final long serialVersionUID = 1L;
private final BigDecimal bid;
private final BigDecimal ask;
private final String code;
private final String description;
private final long timestamp;
/**
* Create a StockPrice for the given stock at a given moment
*/
public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description,
long timestamp) {
super();
this.bid = bid;
this.ask = ask;
this.code = code;
this.description = description;
this.timestamp = timestamp;
}
public BigDecimal getBid() {
return bid;
}
public BigDecimal getAsk() {
return ask;
}
public String getCode() {
return code;
}
public String getDescription() {
return description;
}
public long getTimestamp() {
return timestamp;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("Stock - ");
sb.append(code);
sb.append(" - ");
sb.append(description);
sb.append(" - ");
sb.append(description);
sb.append(" - Bid: ");
sb.append(bid);
sb.append(" - Ask: ");
sb.append(ask);
sb.append(" - ");
SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS");
sb.append(df.format(new Date(timestamp)));
return sb.toString();
}
}
② 建立发布者
package com.cqh.Hazelcast_PubSub;
import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import java.math.BigDecimal;
import java.text.DecimalFormat;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* Created by yl1794 on 2018/4/27.
*/
// 发布者,设置Hazelcast,创建一个主题topic用于股票发布
// 真正发布是在run方法中的topic.publish(price)
public class MarketMaker implements Runnable {
private static Random random = new Random();
private final String stockCode;
private final String description;
private final ITopic<StockPrice> topic;
private volatile boolean running;
public MarketMaker(String topicName, String stockCode, String description) {
this.stockCode = stockCode;
this.description = description;
this.topic = createTopic(topicName);
running = true;
}
@VisibleForTesting
ITopic<StockPrice> createTopic(String topicName) {
HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
return hzInstance.getTopic(topicName);
}
public void publishPrices() {
Thread thread = new Thread(this);
thread.start();
}
@Override
public void run() {
do {
publish();
sleep();
} while (running);
}
private void publish() {
StockPrice price = createStockPrice();
System.out.println(price.toString());
System.out.println("");
topic.publish(price);
}
@VisibleForTesting
StockPrice createStockPrice() {
double price = createPrice();
DecimalFormat df = new DecimalFormat("#.##");
BigDecimal bid = new BigDecimal(df.format(price - variance(price)));
BigDecimal ask = new BigDecimal(df.format(price + variance(price)));
StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description, System.currentTimeMillis());
return stockPrice;
}
private double createPrice() {
int val = random.nextInt(2010 - 1520) + 1520;
double retVal = (double) val / 100;
return retVal;
}
private double variance(double price) {
return (price * 0.01);
}
private void sleep() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void stop() {
running = false;
}
public static void main(String[] args) throws InterruptedException {
MarketMaker bt = new MarketMaker("STOCKS", "AAA", "AAAAAA");
MarketMaker cbry = new MarketMaker("STOCKS", "BBB", "BBBBBB");
MarketMaker bp = new MarketMaker("STOCKS", "CCC", "CCCCCC");
bt.publishPrices();
cbry.publishPrices();
bp.publishPrices();
}
}
③ 建立订阅者
package com.cqh.Hazelcast_PubSub;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
/**
* Created by yl1794 on 2018/4/27.
*/
// 订阅者
public class Client implements MessageListener<StockPrice>{
public Client(String topicName) {
HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
ITopic<StockPrice> topic = hzInstance.getTopic(topicName);
topic.addMessageListener(this);
}
/**
* @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message)
*/
@Override
public void onMessage(Message<StockPrice> arg0) {
System.out.println("Received: " + arg0.getMessageObject().toString());
}
public static void main(String[] args) {
new Client("STOCKS");
}
}
④ 结果
假设先执行发布者,结果如下:
再执行订阅者,结果如下: