欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【Hazelcast】PubSub 发布/订阅

程序员文章站 2022-05-21 13:42:30
...

① 建立股票价格类

package com.cqh.Hazelcast_PubSub;

import java.io.Serializable;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Created by yl1794 on 2018/4/27.
 */
//要求将股票的买入卖出价格在任何时间发布给做市商
public class StockPrice implements Serializable{
    private static final long serialVersionUID = 1L;

    private final BigDecimal bid;

    private final BigDecimal ask;

    private final String code;

    private final String description;

    private final long timestamp;

    /**
     * Create a StockPrice for the given stock at a given moment
     */
    public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description,
                      long timestamp) {
        super();
        this.bid = bid;
        this.ask = ask;
        this.code = code;
        this.description = description;
        this.timestamp = timestamp;
    }

    public BigDecimal getBid() {
        return bid;
    }

    public BigDecimal getAsk() {
        return ask;
    }

    public String getCode() {
        return code;
    }

    public String getDescription() {
        return description;
    }

    public long getTimestamp() {
        return timestamp;
    }

    @Override
    public String toString() {

        StringBuilder sb = new StringBuilder("Stock - ");
        sb.append(code);
        sb.append(" - ");
        sb.append(description);
        sb.append(" - ");
        sb.append(description);
        sb.append(" - Bid: ");
        sb.append(bid);
        sb.append(" - Ask: ");
        sb.append(ask);
        sb.append(" - ");
        SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS");
        sb.append(df.format(new Date(timestamp)));
        return sb.toString();
    }
}

建立发布者

package com.cqh.Hazelcast_PubSub;

import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;

import java.math.BigDecimal;
import java.text.DecimalFormat;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * Created by yl1794 on 2018/4/27.
 */
// 发布者,设置Hazelcast,创建一个主题topic用于股票发布
// 真正发布是在run方法中的topic.publish(price)
public class MarketMaker implements Runnable {

    private static Random random = new Random();

    private final String stockCode;

    private final String description;

    private final ITopic<StockPrice> topic;

    private volatile boolean running;

    public MarketMaker(String topicName, String stockCode, String description) {
        this.stockCode = stockCode;
        this.description = description;
        this.topic = createTopic(topicName);
        running = true;
    }

    @VisibleForTesting
    ITopic<StockPrice> createTopic(String topicName) {
        HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
        return hzInstance.getTopic(topicName);
    }

    public void publishPrices() {
        Thread thread = new Thread(this);
        thread.start();
    }

    @Override
    public void run() {
        do {
            publish();
            sleep();
        } while (running);
    }

    private void publish() {
        StockPrice price = createStockPrice();
        System.out.println(price.toString());
        System.out.println("");
        topic.publish(price);
    }

    @VisibleForTesting
    StockPrice createStockPrice() {

        double price = createPrice();
        DecimalFormat df = new DecimalFormat("#.##");

        BigDecimal bid = new BigDecimal(df.format(price - variance(price)));
        BigDecimal ask = new BigDecimal(df.format(price + variance(price)));

        StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description, System.currentTimeMillis());
        return stockPrice;
    }

    private double createPrice() {

        int val = random.nextInt(2010 - 1520) + 1520;
        double retVal = (double) val / 100;
        return retVal;
    }

    private double variance(double price) {
        return (price * 0.01);
    }

    private void sleep() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void stop() {
        running = false;
    }

    public static void main(String[] args) throws InterruptedException {

        MarketMaker bt = new MarketMaker("STOCKS", "AAA", "AAAAAA");
        MarketMaker cbry = new MarketMaker("STOCKS", "BBB", "BBBBBB");
        MarketMaker bp = new MarketMaker("STOCKS", "CCC", "CCCCCC");

        bt.publishPrices();
        cbry.publishPrices();
        bp.publishPrices();

    }
}

③ 建立订阅者

package com.cqh.Hazelcast_PubSub;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
/**
 * Created by yl1794 on 2018/4/27.
 */
// 订阅者
public class Client implements MessageListener<StockPrice>{

    public Client(String topicName) {
        HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
        ITopic<StockPrice> topic = hzInstance.getTopic(topicName);
        topic.addMessageListener(this);
    }

    /**
     * @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message)
     */
    @Override
    public void onMessage(Message<StockPrice> arg0) {
        System.out.println("Received: " + arg0.getMessageObject().toString());
    }

    public static void main(String[] args) {

        new Client("STOCKS");
    }

}

结果

假设先执行发布者,结果如下:

【Hazelcast】PubSub 发布/订阅

【Hazelcast】PubSub 发布/订阅

再执行订阅者,结果如下:

【Hazelcast】PubSub 发布/订阅

【Hazelcast】PubSub 发布/订阅