Java编程rabbitMQ实现消息的收发
java实现ramqp,即advanced message queuing protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
amqp的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
rabbitmq是一个开源的amqp实现,服务器端用erlang语言编写,支持多种客户端,如:python、ruby、.net、java、jms、c、php、actionscript、xmpp、stomp等,支持ajax。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
本文不介绍amqp和rabbitmq相关知识,请自行网上查阅
本文是基于spring-rabbit中间件来实现消息的发送接受功能
see
see
java编程通过操作rabbitmq消息的收发实现代码如下:
<!-- for rabbitmq --> <dependency> <groupid>com.rabbitmq</groupid> <artifactid>amqp-client</artifactid> <version>2.8.2</version> </dependency> <dependency> <groupid>org.springframework.amqp</groupid> <artifactid>spring-amqp</artifactid> <version>1.1.1.release</version> </dependency> <dependency> <groupid>org.springframework.amqp</groupid> <artifactid>spring-rabbit</artifactid> <version>1.1.1.release</version> </dependency> <dependency> <groupid>com.caucho</groupid> <artifactid>hessian</artifactid> <version>4.0.7</version> </dependency> </dependencies>
首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象
public class eventmessage implements serializable{ private string queuename; private string exchangename; private byte[] eventdata; public eventmessage(string queuename, string exchangename, byte[] eventdata) { this.queuename = queuename; this.exchangename = exchangename; this.eventdata = eventdata; } public eventmessage() { } public string getqueuename() { return queuename; } public string getexchangename() { return exchangename; } public byte[] geteventdata() { return eventdata; } @override public string tostring() { return "eopeventmessage [queuename=" + queuename + ", exchangename=" + exchangename + ", eventdata=" + arrays.tostring(eventdata) + "]"; } }
为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂
public interface codecfactory { byte[] serialize(object obj) throws ioexception; object deserialize(byte[] in) throws ioexception; }
下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式
public class hessioncodecfactory implements codecfactory { private final logger logger = logger.getlogger(hessioncodecfactory.class); @override public byte[] serialize(object obj) throws ioexception { bytearrayoutputstream baos = null; hessianoutput output = null; try { baos = new bytearrayoutputstream(1024); output = new hessianoutput(baos); output.startcall(); output.writeobject(obj); output.completecall(); } catch (final ioexception ex) { throw ex; } finally { if (output != null) { try { baos.close(); } catch (final ioexception ex) { this.logger.error("failed to close stream.", ex); } } } return baos != null ? baos.tobytearray() : null; } @override public object deserialize(byte[] in) throws ioexception { object obj = null; bytearrayinputstream bais = null; hessianinput input = null; try { bais = new bytearrayinputstream(in); input = new hessianinput(bais); input.startreply(); obj = input.readobject(); input.completereply(); } catch (final ioexception ex) { throw ex; } catch (final throwable e) { this.logger.error("failed to decode object.", e); } finally { if (input != null) { try { bais.close(); } catch (final ioexception ex) { this.logger.error("failed to close stream.", ex); } } } return obj; } }
接下来就先实现发送功能,新增一个接口专门用来实现发送功能
public interface eventtemplate { void send(string queuename,string exchangename,object eventcontent) throws sendrefuseexception; void send(string queuename,string exchangename,object eventcontent,codecfactory codecfactory) throws sendrefuseexception; }
sendrefuseexception是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为eventmessage
public class defaulteventtemplate implements eventtemplate { private static final logger logger = logger.getlogger(defaulteventtemplate.class); private amqptemplate eventamqptemplate; private codecfactory defaultcodecfactory; // private defaulteventcontroller eec; // public defaulteventtemplate(amqptemplate eopamqptemplate, // codecfactory defaultcodecfactory, defaulteventcontroller eec) { // this.eventamqptemplate = eopamqptemplate; // this.defaultcodecfactory = defaultcodecfactory; // this.eec = eec; // } public defaulteventtemplate(amqptemplate eopamqptemplate,codecfactory defaultcodecfactory) { this.eventamqptemplate = eopamqptemplate; this.defaultcodecfactory = defaultcodecfactory; } @override public void send(string queuename, string exchangename, object eventcontent) throws sendrefuseexception { this.send(queuename, exchangename, eventcontent, defaultcodecfactory); } @override public void send(string queuename, string exchangename, object eventcontent, codecfactory codecfactory) throws sendrefuseexception { if (stringutils.isempty(queuename) || stringutils.isempty(exchangename)) { throw new sendrefuseexception("queuename exchangename can not be empty."); } // if (!eec.bebinded(exchangename, queuename)) // eec.declarebinding(exchangename, queuename); byte[] eventcontentbytes = null; if (codecfactory == null) { if (eventcontent == null) { logger.warn("find eventcontent is null,are you sure..."); } else { throw new sendrefuseexception( "codecfactory must not be null ,unless eventcontent is null"); } } else { try { eventcontentbytes = codecfactory.serialize(eventcontent); } catch (ioexception e) { throw new sendrefuseexception(e); } } // 构造成message eventmessage msg = new eventmessage(queuename, exchangename, eventcontentbytes); try { eventamqptemplate.convertandsend(exchangename, queuename, msg); } catch (amqpexception e) { logger.error("send event fail. event message : [" + eventcontent + "]", e); throw new sendrefuseexception("send event fail", e); } } }
注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类
public interface eventprocesser { public void process(object e); }
为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器
/** * messagelisteneradapter的pojo * <p>消息处理适配器,主要功能:</p> * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由a处理器来出来</p> * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p> * */ public class messageadapterhandler { private static final logger logger = logger.getlogger(messageadapterhandler.class); private concurrentmap<string, eventprocessorwrap> epwmap; public messageadapterhandler() { this.epwmap = new concurrenthashmap<string, eventprocessorwrap>(); } public void handlemessage(eventmessage eem) { logger.debug("receive an eventmessage: [" + eem + "]"); // 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值 if (eem == null) { logger.warn("receive an null eventmessage, it may product some errors, and processing message is canceled."); return; } if (stringutils.isempty(eem.getqueuename()) || stringutils.isempty(eem.getexchangename())) { logger.warn("the eventmessage's queuename and exchangename is empty, this is not allowed, and processing message is canceled."); return; } // 解码,并交给对应的eventhandle执行 eventprocessorwrap eepw = epwmap.get(eem.getqueuename()+"|"+eem.getexchangename()); if (eepw == null) { logger.warn("receive an eopeventmessage, but no processor can do it."); return; } try { eepw.process(eem.geteventdata()); } catch (ioexception e) { logger.error("event content can not be deserialized, check the provided codecfactory.",e); return; } } protected void add(string queuename, string exchangename, eventprocesser processor,codecfactory codecfactory) { if (stringutils.isempty(queuename) || stringutils.isempty(exchangename) || processor == null || codecfactory == null) { throw new runtimeexception("queuename and exchangename can not be empty,and processor or codecfactory can not be null. "); } eventprocessorwrap epw = new eventprocessorwrap(codecfactory,processor); eventprocessorwrap oldprocessorwrap = epwmap.putifabsent(queuename + "|" + exchangename, epw); if (oldprocessorwrap != null) { logger.warn("the processor of this queue and exchange exists, and the new one can't be add"); } } protected set<string> getallbinding() { set<string> keyset = epwmap.keyset(); return keyset; } protected static class eventprocessorwrap { private codecfactory codecfactory; private eventprocesser eep; protected eventprocessorwrap(codecfactory codecfactory, eventprocesser eep) { this.codecfactory = codecfactory; this.eep = eep; } public void process(byte[] eventdata) throws ioexception{ object obj = codecfactory.deserialize(eventdata); eep.process(obj); } } }
这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息
public class messageerrorhandler implements errorhandler{ private static final logger logger = logger.getlogger(messageerrorhandler.class); @override public void handleerror(throwable t) { logger.error("rabbitmq happen a error:" + t.getmessage(), t); } }
接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息
public class eventcontrolconfig { private final static int default_port = 5672; private final static string default_username = "guest"; private final static string default_password = "guest"; private final static int default_process_thread_num = runtime.getruntime().availableprocessors() * 2; private static final int prefetch_size = 1; private string serverhost ; private int port = default_port; private string username = default_username; private string password = default_password; private string virtualhost; /** * 和rabbitmq建立连接的超时时间 */ private int connectiontimeout = 0; /** * 事件消息处理线程数,默认是 cpu核数 * 2 */ private int eventmsgprocessnum; /** * 每次消费消息的预取值 */ private int prefetchsize; public eventcontrolconfig(string serverhost) { this(serverhost,default_port,default_username,default_password,null,0,default_process_thread_num,default_process_thread_num,new hessioncodecfactory()); } public eventcontrolconfig(string serverhost, int port, string username, string password, string virtualhost, int connectiontimeout, int eventmsgprocessnum,int prefetchsize,codecfactory defaultcodecfactory) { this.serverhost = serverhost; this.port = port>0?port:default_port; this.username = username; this.password = password; this.virtualhost = virtualhost; this.connectiontimeout = connectiontimeout>0?connectiontimeout:0; this.eventmsgprocessnum = eventmsgprocessnum>0?eventmsgprocessnum:default_process_thread_num; this.prefetchsize = prefetchsize>0?prefetchsize:prefetch_size; } public string getserverhost() { return serverhost; } public int getport() { return port; } public string getusername() { return username; } public string getpassword() { return password; } public string getvirtualhost() { return virtualhost; } public int getconnectiontimeout() { return connectiontimeout; } public int geteventmsgprocessnum() { return eventmsgprocessnum; } public int getprefetchsize() { return prefetchsize; } }
具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信
public interface eventcontroller { /** * 控制器启动方法 */ void start(); /** * 获取发送模版 */ eventtemplate geteopeventtemplate(); /** * 绑定消费程序到对应的exchange和queue */ eventcontroller add(string queuename, string exchangename, eventprocesser eventprocesser); /*in map, the key is queue name, but value is exchange name*/ eventcontroller add(map<string,string> bindings, eventprocesser eventprocesser); }
它的实现类如下:
/** * 和rabbitmq通信的控制器,主要负责: * <p>1、和rabbitmq建立连接</p> * <p>2、声明exchange和queue以及它们的绑定关系</p> * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p> * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p> * @author yangyong * */ public class defaulteventcontroller implements eventcontroller { private cachingconnectionfactory rabbitconnectionfactory; private eventcontrolconfig config; private rabbitadmin rabbitadmin; private codecfactory defaultcodecfactory = new hessioncodecfactory(); private simplemessagelistenercontainer msglistenercontainer; // rabbitmq msg listener container private messageadapterhandler msgadapterhandler = new messageadapterhandler(); private messageconverter serializermessageconverter = new serializermessageconverter(); // 直接指定 //queue cache, key is exchangename private map<string, directexchange> exchanges = new hashmap<string,directexchange>(); //queue cache, key is queuename private map<string, queue> queues = new hashmap<string, queue>(); //bind relation of queue to exchange cache, value is exchangename | queuename private set<string> binded = new hashset<string>(); private eventtemplate eventtemplate; // 给app使用的event发送客户端 private atomicboolean isstarted = new atomicboolean(false); private static defaulteventcontroller defaulteventcontroller; public synchronized static defaulteventcontroller getinstance(eventcontrolconfig config){ if(defaulteventcontroller==null){ defaulteventcontroller = new defaulteventcontroller(config); } return defaulteventcontroller; } private defaulteventcontroller(eventcontrolconfig config){ if (config == null) { throw new illegalargumentexception("config can not be null."); } this.config = config; initrabbitconnectionfactory(); // 初始化amqpadmin rabbitadmin = new rabbitadmin(rabbitconnectionfactory); // 初始化rabbittemplate rabbittemplate rabbittemplate = new rabbittemplate(rabbitconnectionfactory); rabbittemplate.setmessageconverter(serializermessageconverter); eventtemplate = new defaulteventtemplate(rabbittemplate,defaultcodecfactory, this); } /** * 初始化rabbitmq连接 */ private void initrabbitconnectionfactory() { rabbitconnectionfactory = new cachingconnectionfactory(); rabbitconnectionfactory.sethost(config.getserverhost()); rabbitconnectionfactory.setchannelcachesize(config.geteventmsgprocessnum()); rabbitconnectionfactory.setport(config.getport()); rabbitconnectionfactory.setusername(config.getusername()); rabbitconnectionfactory.setpassword(config.getpassword()); if (!stringutils.isempty(config.getvirtualhost())) { rabbitconnectionfactory.setvirtualhost(config.getvirtualhost()); } } /** * 注销程序 */ public synchronized void destroy() throws exception { if (!isstarted.get()) { return; } msglistenercontainer.stop(); eventtemplate = null; rabbitadmin = null; rabbitconnectionfactory.destroy(); } @override public void start() { if (isstarted.get()) { return; } set<string> mapping = msgadapterhandler.getallbinding(); for (string relation : mapping) { string[] relaarr = relation.split("\\|"); declarebinding(relaarr[1], relaarr[0]); } initmsglisteneradapter(); isstarted.set(true); } /** * 初始化消息监听器容器 */ private void initmsglisteneradapter(){ messagelistener listener = new messagelisteneradapter(msgadapterhandler,serializermessageconverter); msglistenercontainer = new simplemessagelistenercontainer(); msglistenercontainer.setconnectionfactory(rabbitconnectionfactory); msglistenercontainer.setacknowledgemode(acknowledgemode.auto); msglistenercontainer.setmessagelistener(listener); msglistenercontainer.seterrorhandler(new messageerrorhandler()); msglistenercontainer.setprefetchcount(config.getprefetchsize()); // 设置每个消费者消息的预取值 msglistenercontainer.setconcurrentconsumers(config.geteventmsgprocessnum()); msglistenercontainer.settxsize(config.getprefetchsize());//设置有事务时处理的消息数 msglistenercontainer.setqueues(queues.values().toarray(new queue[queues.size()])); msglistenercontainer.start(); } @override public eventtemplate geteopeventtemplate() { return eventtemplate; } @override public eventcontroller add(string queuename, string exchangename,eventprocesser eventprocesser) { return add(queuename, exchangename, eventprocesser, defaultcodecfactory); } public eventcontroller add(string queuename, string exchangename,eventprocesser eventprocesser,codecfactory codecfactory) { msgadapterhandler.add(queuename, exchangename, eventprocesser, defaultcodecfactory); if(isstarted.get()){ initmsglisteneradapter(); } return this; } @override public eventcontroller add(map<string, string> bindings, eventprocesser eventprocesser) { return add(bindings, eventprocesser,defaultcodecfactory); } public eventcontroller add(map<string, string> bindings, eventprocesser eventprocesser, codecfactory codecfactory) { for(map.entry<string, string> item: bindings.entryset()) msgadapterhandler.add(item.getkey(),item.getvalue(), eventprocesser,codecfactory); return this; } /** * exchange和queue是否已经绑定 */ protected boolean bebinded(string exchangename, string queuename) { return binded.contains(exchangename+"|"+queuename); } /** * 声明exchange和queue已经它们的绑定关系 */ protected synchronized void declarebinding(string exchangename, string queuename) { string bindrelation = exchangename+"|"+queuename; if (binded.contains(bindrelation)) return; boolean needbinding = false; directexchange directexchange = exchanges.get(exchangename); if(directexchange == null) { directexchange = new directexchange(exchangename, true, false, null); exchanges.put(exchangename, directexchange); rabbitadmin.declareexchange(directexchange);//声明exchange needbinding = true; } queue queue = queues.get(queuename); if(queue == null) { queue = new queue(queuename, true, false, false); queues.put(queuename, queue); rabbitadmin.declarequeue(queue); //声明queue needbinding = true; } if(needbinding) { binding binding = bindingbuilder.bind(queue).to(directexchange).with(queuename);//将queue绑定到exchange rabbitadmin.declarebinding(binding);//声明绑定关系 binded.add(bindrelation); } } }
搞定,现在可以将defaulteventtemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个po
@suppresswarnings("serial") public class people implements serializable{ private int id; private string name; private boolean male; private people spouse; private list<people> friends; public int getid() { return id; } public void setid(int id) { this.id = id; } public string getname() { return name; } public void setname(string name) { this.name = name; } public boolean ismale() { return male; } public void setmale(boolean male) { this.male = male; } public people getspouse() { return spouse; } public void setspouse(people spouse) { this.spouse = spouse; } public list<people> getfriends() { return friends; } public void setfriends(list<people> friends) { this.friends = friends; } @override public string tostring() { // todo auto-generated method stub return "people[id="+id+",name="+name+",male="+male+"]"; } }
建立单元测试
public class rabbitmqtest{ private string defaulthost = "127.0.0.1"; private string defaultexchange = "exchange_direct_test"; private string defaultqueue = "queue_test"; private defaulteventcontroller controller; private eventtemplate eventtemplate; @before public void init() throws ioexception{ eventcontrolconfig config = new eventcontrolconfig(defaulthost); controller = defaulteventcontroller.getinstance(config); eventtemplate = controller.geteopeventtemplate(); controller.add(defaultqueue, defaultexchange, new apiprocesseventprocessor()); controller.start(); } @test public void sendstring() throws sendrefuseexception{ eventtemplate.send(defaultqueue, defaultexchange, "hello world"); } @test public void sendobject() throws sendrefuseexception{ eventtemplate.send(defaultqueue, defaultexchange, mockobj()); } @test public void sendtemp() throws sendrefuseexception, interruptedexception{ string tempexchange = "exchange_direct_test_temp";//以前未声明的exchange string tempqueue = "queue_test_temp";//以前未声明的queue eventtemplate.send(tempqueue, tempexchange, mockobj()); //发送成功后此时不会接受到消息,还需要绑定对应的消费程序 controller.add(tempqueue, tempexchange, new apiprocesseventprocessor()); } @after public void end() throws interruptedexception{ thread.sleep(2000); } private people mockobj(){ people jack = new people(); jack.setid(1); jack.setname("jack"); jack.setmale(true); list<people> friends = new arraylist<>(); friends.add(jack); people hanmeimei = new people(); hanmeimei.setid(1); hanmeimei.setname("韩梅梅"); hanmeimei.setmale(false); hanmeimei.setfriends(friends); people lilei = new people(); lilei.setid(2); lilei.setname("李雷"); lilei.setmale(true); lilei.setfriends(friends); lilei.setspouse(hanmeimei); hanmeimei.setspouse(lilei); return hanmeimei; } class apiprocesseventprocessor implements eventprocesser{ @override public void process(object e) {//消费程序这里只是打印信息 assert.assertnotnull(e); system.out.println(e); if(e instanceof people){ people people = (people)e; system.out.println(people.getspouse()); system.out.println(people.getfriends()); } } } }
源码地址请点击
总结
以上就是本文关于java实现rabbitmq消息的发送接受的全部内容,希望对大家有所帮助。
感谢大家对本站的支持。
下一篇: .NET 单点登录解决方案