欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于zbus消息队列的 生产者和消费者模型 zbus 

程序员文章站 2022-07-03 08:33:51
...
zubs是一个消息队列;; ZBUS = MQ + RPC + PROXY  支持消息队列, 发布订阅, RPC, 代理(TCP/HTTP/DMZ)

消费者
ZConsumer.java
package com.gbcom.frame.zbus;

import java.io.IOException;

import org.zbus.broker.Broker;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Consumer.ConsumerHandler;
import org.zbus.mq.server.MqServer;
import org.zbus.mq.server.MqServerConfig;
import org.zbus.net.http.Message;

/**
 * 消费者:嵌入zbus服务器,订阅消息处理器
 * @author SYZ
 * @date 2016-6-14 下午02:58:50
 * @version 1.0.0
 * @see com.gbcom.frame.zbus.ZConsumer
 */
public class ZConsumer {

	/**   : (ZConsumer.main)
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		try {
			start();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private static void start() throws Exception{
		
		//嵌入zbus消息服务器。
		MqServerConfig config = new MqServerConfig();
		config.serverPort = 15555;
		config.storePath = "./store";
		final MqServer server = new MqServer(config);
		server.start(); 
		
		Broker broker = new ZbusBroker("127.0.0.1:15555"); //SingleBroker
		Consumer consumer = new Consumer(broker, "MyMQ");  
		consumer.start(new ConsumerHandler() { 
		    @Override
		    public void handle(Message msg, Consumer consumer) throws IOException { 
		        //消息回调处理
		        System.out.println(msg);
		    }
		}); 
	}

}




生产者

ZProduct.java
package com.gbcom.frame.zbus;

import java.io.IOException;

import org.zbus.broker.Broker;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
/**
 * 需要开启 zbus服务器。。消息中间件都是这样。
 * 
 * 如果不开启zbus 需要嵌入到服务器中个,例如  zconsumer.java中 
 * 
 * @author SYZ
 * @date 2016-8-12 下午05:38:35
 * @version 1.0.0
 * @see com.gbcom.frame.zbus.ZProduct
 */
public class ZProduct {

	/**   : (ZClient.main)
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		try {
			start();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	private static void start() throws IOException, InterruptedException{
		Broker broker = new ZbusBroker("127.0.0.1:15555"); //SingleBroker
//		Broker broker = new ZbusBroker("127.0.0.1:16666;127.0.0.1:16667"); //HaBroker
//		Broker broker = new ZbusBroker("jvm"); //JvmBroker
		
		Producer producer = new Producer(broker, "MyMQ");
		producer.createMQ();//确定为创建消息队列需要显示调用



		for (int i = 0; i < 10; i++) {
			Message msg = new Message();
			msg.setBody("hello world-"+i);
			Message res = producer.sendSync(msg, 1000);
			System.out.println(res);
			}
		
		broker.close();
		
	}

}

相关标签: zbus