基于zbus消息队列的 生产者和消费者模型 zbus
程序员文章站
2022-07-03 08:33:51
...
zubs是一个消息队列;; ZBUS = MQ + RPC + PROXY 支持消息队列, 发布订阅, RPC, 代理(TCP/HTTP/DMZ)
消费者
ZConsumer.java
生产者
ZProduct.java
消费者
ZConsumer.java
package com.gbcom.frame.zbus; import java.io.IOException; import org.zbus.broker.Broker; import org.zbus.broker.ZbusBroker; import org.zbus.mq.Consumer; import org.zbus.mq.Consumer.ConsumerHandler; import org.zbus.mq.server.MqServer; import org.zbus.mq.server.MqServerConfig; import org.zbus.net.http.Message; /** * 消费者:嵌入zbus服务器,订阅消息处理器 * @author SYZ * @date 2016-6-14 下午02:58:50 * @version 1.0.0 * @see com.gbcom.frame.zbus.ZConsumer */ public class ZConsumer { /** : (ZConsumer.main) * @param args * @throws Exception */ public static void main(String[] args) throws Exception { try { start(); } catch (IOException e) { e.printStackTrace(); } } private static void start() throws Exception{ //嵌入zbus消息服务器。 MqServerConfig config = new MqServerConfig(); config.serverPort = 15555; config.storePath = "./store"; final MqServer server = new MqServer(config); server.start(); Broker broker = new ZbusBroker("127.0.0.1:15555"); //SingleBroker Consumer consumer = new Consumer(broker, "MyMQ"); consumer.start(new ConsumerHandler() { @Override public void handle(Message msg, Consumer consumer) throws IOException { //消息回调处理 System.out.println(msg); } }); } }
生产者
ZProduct.java
package com.gbcom.frame.zbus; import java.io.IOException; import org.zbus.broker.Broker; import org.zbus.broker.ZbusBroker; import org.zbus.mq.Producer; import org.zbus.net.http.Message; /** * 需要开启 zbus服务器。。消息中间件都是这样。 * * 如果不开启zbus 需要嵌入到服务器中个,例如 zconsumer.java中 * * @author SYZ * @date 2016-8-12 下午05:38:35 * @version 1.0.0 * @see com.gbcom.frame.zbus.ZProduct */ public class ZProduct { /** : (ZClient.main) * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub try { start(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private static void start() throws IOException, InterruptedException{ Broker broker = new ZbusBroker("127.0.0.1:15555"); //SingleBroker // Broker broker = new ZbusBroker("127.0.0.1:16666;127.0.0.1:16667"); //HaBroker // Broker broker = new ZbusBroker("jvm"); //JvmBroker Producer producer = new Producer(broker, "MyMQ"); producer.createMQ();//确定为创建消息队列需要显示调用 for (int i = 0; i < 10; i++) { Message msg = new Message(); msg.setBody("hello world-"+i); Message res = producer.sendSync(msg, 1000); System.out.println(res); } broker.close(); } }