欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java编程rabbitMQ实现消息的收发

程序员文章站 2024-02-29 20:20:04
java实现ramqp,即advanced message queuing protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间...

java实现ramqp,即advanced message queuing protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

amqp的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

rabbitmq是一个开源的amqp实现,服务器端用erlang语言编写,支持多种客户端,如:python、ruby、.net、java、jms、c、php、actionscript、xmpp、stomp等,支持ajax。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

本文不介绍amqp和rabbitmq相关知识,请自行网上查阅

本文是基于spring-rabbit中间件来实现消息的发送接受功能

see

see

java编程通过操作rabbitmq消息的收发实现代码如下:

<!-- for rabbitmq -->
 	<dependency>
		<groupid>com.rabbitmq</groupid>
		<artifactid>amqp-client</artifactid>
		<version>2.8.2</version>
	</dependency>
	<dependency>
		<groupid>org.springframework.amqp</groupid>
		<artifactid>spring-amqp</artifactid>
		<version>1.1.1.release</version>
	</dependency>
	<dependency>
		<groupid>org.springframework.amqp</groupid>
		<artifactid>spring-rabbit</artifactid>
		<version>1.1.1.release</version>
	</dependency>
	<dependency>
		<groupid>com.caucho</groupid>
		<artifactid>hessian</artifactid>
		<version>4.0.7</version>
	</dependency>
 </dependencies>

首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象

public class eventmessage implements serializable{
	private string queuename;
	private string exchangename;
	private byte[] eventdata;
	public eventmessage(string queuename, string exchangename, byte[] eventdata) {
		this.queuename = queuename;
		this.exchangename = exchangename;
		this.eventdata = eventdata;
	}
	public eventmessage() {
	}	
	public string getqueuename() {
		return queuename;
	}
	public string getexchangename() {
		return exchangename;
	}
	public byte[] geteventdata() {
		return eventdata;
	}
	@override
	public string tostring() {
		return "eopeventmessage [queuename=" + queuename + ", exchangename="
				+ exchangename + ", eventdata=" + arrays.tostring(eventdata)
				+ "]";
	}
}

为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂

public interface codecfactory {
	byte[] serialize(object obj) throws ioexception;
	object deserialize(byte[] in) throws ioexception;
}

下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式

public class hessioncodecfactory implements codecfactory {
	private final logger logger = logger.getlogger(hessioncodecfactory.class);
	@override
	public byte[] serialize(object obj) throws ioexception {
		bytearrayoutputstream baos = null;
		hessianoutput output = null;
		try {
			baos = new bytearrayoutputstream(1024);
			output = new hessianoutput(baos);
			output.startcall();
			output.writeobject(obj);
			output.completecall();
		} catch (final ioexception ex) {
			throw ex;
		} finally {
			if (output != null) {
				try {
					baos.close();
				} catch (final ioexception ex) {
					this.logger.error("failed to close stream.", ex);
				}
			}
		}
		return baos != null ? baos.tobytearray() : null;
	}
	@override
	public object deserialize(byte[] in) throws ioexception {
		object obj = null;
		bytearrayinputstream bais = null;
		hessianinput input = null;
		try {
			bais = new bytearrayinputstream(in);
			input = new hessianinput(bais);
			input.startreply();
			obj = input.readobject();
			input.completereply();
		} catch (final ioexception ex) {
			throw ex;
		} catch (final throwable e) {
			this.logger.error("failed to decode object.", e);
		} finally {
			if (input != null) {
				try {
					bais.close();
				} catch (final ioexception ex) {
					this.logger.error("failed to close stream.", ex);
  }
  }
 }
 return obj;
	}
}

接下来就先实现发送功能,新增一个接口专门用来实现发送功能

public interface eventtemplate {
	void send(string queuename,string exchangename,object eventcontent) throws sendrefuseexception;
	void send(string queuename,string exchangename,object eventcontent,codecfactory codecfactory) throws sendrefuseexception;
}

sendrefuseexception是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为eventmessage

public class defaulteventtemplate implements eventtemplate {
	private static final logger logger = logger.getlogger(defaulteventtemplate.class);
	private amqptemplate eventamqptemplate;
	private codecfactory defaultcodecfactory;
//	private defaulteventcontroller eec;
//	public defaulteventtemplate(amqptemplate eopamqptemplate,
//			codecfactory defaultcodecfactory, defaulteventcontroller eec) {
//		this.eventamqptemplate = eopamqptemplate;
//		this.defaultcodecfactory = defaultcodecfactory;
//		this.eec = eec;
//	}
	public defaulteventtemplate(amqptemplate eopamqptemplate,codecfactory defaultcodecfactory) {
		this.eventamqptemplate = eopamqptemplate;
		this.defaultcodecfactory = defaultcodecfactory;
	}
	@override
	public void send(string queuename, string exchangename, object eventcontent)
			throws sendrefuseexception {
		this.send(queuename, exchangename, eventcontent, defaultcodecfactory);
	} 
	@override
	public void send(string queuename, string exchangename, object eventcontent,
			codecfactory codecfactory) throws sendrefuseexception {
		if (stringutils.isempty(queuename) || stringutils.isempty(exchangename)) {
			throw new sendrefuseexception("queuename exchangename can not be empty.");
		}
//		if (!eec.bebinded(exchangename, queuename))
//			eec.declarebinding(exchangename, queuename);
		byte[] eventcontentbytes = null;
		if (codecfactory == null) {
			if (eventcontent == null) {
				logger.warn("find eventcontent is null,are you sure...");
			} else {
				throw new sendrefuseexception(
						"codecfactory must not be null ,unless eventcontent is null");
			}
		} else {
			try {
				eventcontentbytes = codecfactory.serialize(eventcontent);
			} catch (ioexception e) {
				throw new sendrefuseexception(e);
			}
		}
		// 构造成message
		eventmessage msg = new eventmessage(queuename, exchangename,
				eventcontentbytes);
		try {
			eventamqptemplate.convertandsend(exchangename, queuename, msg);
		} catch (amqpexception e) {
			logger.error("send event fail. event message : [" + eventcontent + "]", e);
			throw new sendrefuseexception("send event fail", e);
		}
	}
}

注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类

public interface eventprocesser {
	public void process(object e);
}

 为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器

/**
 * messagelisteneradapter的pojo
 * <p>消息处理适配器,主要功能:</p>
 * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由a处理器来出来</p>
 * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p>
 * 
 */
public class messageadapterhandler {
	private static final logger logger = logger.getlogger(messageadapterhandler.class);
	private concurrentmap<string, eventprocessorwrap> epwmap;
	public messageadapterhandler() {
		this.epwmap = new concurrenthashmap<string, eventprocessorwrap>();
	}
	public void handlemessage(eventmessage eem) {
		logger.debug("receive an eventmessage: [" + eem + "]");
		// 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值
		if (eem == null) {
			logger.warn("receive an null eventmessage, it may product some errors, and processing message is canceled.");
			return;
		}
		if (stringutils.isempty(eem.getqueuename()) || stringutils.isempty(eem.getexchangename())) {
			logger.warn("the eventmessage's queuename and exchangename is empty, this is not allowed, and processing message is canceled.");
			return;
		}
		// 解码,并交给对应的eventhandle执行
		eventprocessorwrap eepw = epwmap.get(eem.getqueuename()+"|"+eem.getexchangename());
		if (eepw == null) {
			logger.warn("receive an eopeventmessage, but no processor can do it.");
			return;
		}
		try {
			eepw.process(eem.geteventdata());
		} catch (ioexception e) {
			logger.error("event content can not be deserialized, check the provided codecfactory.",e);
			return;
		}
	}
	protected void add(string queuename, string exchangename, eventprocesser processor,codecfactory codecfactory) {
		if (stringutils.isempty(queuename) || stringutils.isempty(exchangename) || processor == null || codecfactory == null) {
			throw new runtimeexception("queuename and exchangename can not be empty,and processor or codecfactory can not be null. ");
		}
		eventprocessorwrap epw = new eventprocessorwrap(codecfactory,processor);
		eventprocessorwrap oldprocessorwrap = epwmap.putifabsent(queuename + "|" + exchangename, epw);
		if (oldprocessorwrap != null) {
			logger.warn("the processor of this queue and exchange exists, and the new one can't be add");
		}
	}
	protected set<string> getallbinding() {
		set<string> keyset = epwmap.keyset();
		return keyset;
	}
	protected static class eventprocessorwrap {
		private codecfactory codecfactory;
		private eventprocesser eep;
		protected eventprocessorwrap(codecfactory codecfactory,
				eventprocesser eep) {
			this.codecfactory = codecfactory;
			this.eep = eep;
		}
		public void process(byte[] eventdata) throws ioexception{
			object obj = codecfactory.deserialize(eventdata);
			eep.process(obj);
		}
	}
}

这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息

public class messageerrorhandler implements errorhandler{
	private static final logger logger = logger.getlogger(messageerrorhandler.class);
	@override
	public void handleerror(throwable t) {
		logger.error("rabbitmq happen a error:" + t.getmessage(), t);
	}
}

接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息

public class eventcontrolconfig {
	private final static int default_port = 5672;
	private final static string default_username = "guest";
	private final static string default_password = "guest";
	private final static int default_process_thread_num = runtime.getruntime().availableprocessors() * 2;
	private static final int prefetch_size = 1;
	private string serverhost ;
	private int port = default_port;
	private string username = default_username;
	private string password = default_password;
	private string virtualhost;
	/**
	 * 和rabbitmq建立连接的超时时间
	 */
	private int connectiontimeout = 0;
	/**
	 * 事件消息处理线程数,默认是 cpu核数 * 2
	 */
	private int eventmsgprocessnum;
	/**
	 * 每次消费消息的预取值
	 */
	private int prefetchsize;
	
	public eventcontrolconfig(string serverhost) {
		this(serverhost,default_port,default_username,default_password,null,0,default_process_thread_num,default_process_thread_num,new hessioncodecfactory());
	}
	public eventcontrolconfig(string serverhost, int port, string username,
			string password, string virtualhost, int connectiontimeout,
			int eventmsgprocessnum,int prefetchsize,codecfactory defaultcodecfactory) {
		this.serverhost = serverhost;
		this.port = port>0?port:default_port;
		this.username = username;
		this.password = password;
		this.virtualhost = virtualhost;
		this.connectiontimeout = connectiontimeout>0?connectiontimeout:0;
		this.eventmsgprocessnum = eventmsgprocessnum>0?eventmsgprocessnum:default_process_thread_num;
		this.prefetchsize = prefetchsize>0?prefetchsize:prefetch_size;
	}
	public string getserverhost() {
		return serverhost;
	}
	public int getport() {
		return port;
	}
	public string getusername() {
		return username;
	}
	public string getpassword() {
		return password;
	}
	public string getvirtualhost() {
		return virtualhost;
	}
	public int getconnectiontimeout() {
		return connectiontimeout;
	}
	public int geteventmsgprocessnum() {
		return eventmsgprocessnum;
	}
	public int getprefetchsize() {
		return prefetchsize;
	}
}

具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信

public interface eventcontroller {	
	/**
	 * 控制器启动方法
	 */
	void start();
	/**
	 * 获取发送模版
	 */
	eventtemplate geteopeventtemplate();
	/**
	 * 绑定消费程序到对应的exchange和queue
	 */
	eventcontroller add(string queuename, string exchangename, eventprocesser eventprocesser);
	/*in map, the key is queue name, but value is exchange name*/
	eventcontroller add(map<string,string> bindings, eventprocesser eventprocesser);
}

它的实现类如下:

/**
 * 和rabbitmq通信的控制器,主要负责:
 * <p>1、和rabbitmq建立连接</p>
 * <p>2、声明exchange和queue以及它们的绑定关系</p>
 * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p>
 * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p>
 * @author yangyong
 *
 */
public class defaulteventcontroller implements eventcontroller {
	private cachingconnectionfactory rabbitconnectionfactory;
	private eventcontrolconfig config;
	private rabbitadmin rabbitadmin;
	private codecfactory defaultcodecfactory = new hessioncodecfactory();
	private simplemessagelistenercontainer msglistenercontainer; // rabbitmq msg listener container
	private messageadapterhandler msgadapterhandler = new messageadapterhandler();
	private messageconverter serializermessageconverter = new serializermessageconverter(); // 直接指定
	//queue cache, key is exchangename
	private map<string, directexchange> exchanges = new hashmap<string,directexchange>();
	//queue cache, key is queuename
	private map<string, queue> queues = new hashmap<string, queue>();
	//bind relation of queue to exchange cache, value is exchangename | queuename
	private set<string> binded = new hashset<string>();
	private eventtemplate eventtemplate; // 给app使用的event发送客户端
	private atomicboolean isstarted = new atomicboolean(false);
	private static defaulteventcontroller defaulteventcontroller;
	public synchronized static defaulteventcontroller getinstance(eventcontrolconfig config){
		if(defaulteventcontroller==null){
			defaulteventcontroller = new defaulteventcontroller(config);
		}
		return defaulteventcontroller;
	}
	private defaulteventcontroller(eventcontrolconfig config){
		if (config == null) {
			throw new illegalargumentexception("config can not be null.");
		}
		this.config = config;
		initrabbitconnectionfactory();
		// 初始化amqpadmin
		rabbitadmin = new rabbitadmin(rabbitconnectionfactory);
		// 初始化rabbittemplate
		rabbittemplate rabbittemplate = new rabbittemplate(rabbitconnectionfactory);
		rabbittemplate.setmessageconverter(serializermessageconverter);
		eventtemplate = new defaulteventtemplate(rabbittemplate,defaultcodecfactory, this);
	}
	/**
	 * 初始化rabbitmq连接
	 */
	private void initrabbitconnectionfactory() {
		rabbitconnectionfactory = new cachingconnectionfactory();
		rabbitconnectionfactory.sethost(config.getserverhost());
		rabbitconnectionfactory.setchannelcachesize(config.geteventmsgprocessnum());
		rabbitconnectionfactory.setport(config.getport());
		rabbitconnectionfactory.setusername(config.getusername());
		rabbitconnectionfactory.setpassword(config.getpassword());
		if (!stringutils.isempty(config.getvirtualhost())) {
			rabbitconnectionfactory.setvirtualhost(config.getvirtualhost());
		}
	}
	/**
	 * 注销程序
	 */
	public synchronized void destroy() throws exception {
		if (!isstarted.get()) {
			return;
		}
		msglistenercontainer.stop();
		eventtemplate = null;
		rabbitadmin = null;
		rabbitconnectionfactory.destroy();
	}
	@override
	public void start() {
		if (isstarted.get()) {
			return;
		}
		set<string> mapping = msgadapterhandler.getallbinding();
		for (string relation : mapping) {
			string[] relaarr = relation.split("\\|");
			declarebinding(relaarr[1], relaarr[0]);
		}
		initmsglisteneradapter();
		isstarted.set(true);
	}
	/**
	 * 初始化消息监听器容器
	 */
	private void initmsglisteneradapter(){
		messagelistener listener = new messagelisteneradapter(msgadapterhandler,serializermessageconverter);
		msglistenercontainer = new simplemessagelistenercontainer();
		msglistenercontainer.setconnectionfactory(rabbitconnectionfactory);
		msglistenercontainer.setacknowledgemode(acknowledgemode.auto);
		msglistenercontainer.setmessagelistener(listener);
		msglistenercontainer.seterrorhandler(new messageerrorhandler());
		msglistenercontainer.setprefetchcount(config.getprefetchsize()); // 设置每个消费者消息的预取值
		msglistenercontainer.setconcurrentconsumers(config.geteventmsgprocessnum());
		msglistenercontainer.settxsize(config.getprefetchsize());//设置有事务时处理的消息数
		msglistenercontainer.setqueues(queues.values().toarray(new queue[queues.size()]));
		msglistenercontainer.start();
	}
	@override
	public eventtemplate geteopeventtemplate() {
		return eventtemplate;
	}
	@override
	public eventcontroller add(string queuename, string exchangename,eventprocesser eventprocesser) {
		return add(queuename, exchangename, eventprocesser, defaultcodecfactory);
	}
	
	public eventcontroller add(string queuename, string exchangename,eventprocesser eventprocesser,codecfactory codecfactory) {
		msgadapterhandler.add(queuename, exchangename, eventprocesser, defaultcodecfactory);
		if(isstarted.get()){
			initmsglisteneradapter();
		}
		return this;
	}
	@override
	public eventcontroller add(map<string, string> bindings,
			eventprocesser eventprocesser) {
		return add(bindings, eventprocesser,defaultcodecfactory);
	}

	public eventcontroller add(map<string, string> bindings,
			eventprocesser eventprocesser, codecfactory codecfactory) {
		for(map.entry<string, string> item: bindings.entryset()) 
			msgadapterhandler.add(item.getkey(),item.getvalue(), eventprocesser,codecfactory);
		return this;
	}
	/**
	 * exchange和queue是否已经绑定
	 */
	protected boolean bebinded(string exchangename, string queuename) {
		return binded.contains(exchangename+"|"+queuename);
	}
	/**
	 * 声明exchange和queue已经它们的绑定关系
	 */
	protected synchronized void declarebinding(string exchangename, string queuename) {
		string bindrelation = exchangename+"|"+queuename;
		if (binded.contains(bindrelation)) return;
		
		boolean needbinding = false;
		directexchange directexchange = exchanges.get(exchangename);
		if(directexchange == null) {
			directexchange = new directexchange(exchangename, true, false, null);
			exchanges.put(exchangename, directexchange);
			rabbitadmin.declareexchange(directexchange);//声明exchange
			needbinding = true;
		}
		queue queue = queues.get(queuename);
		if(queue == null) {
			queue = new queue(queuename, true, false, false);
			queues.put(queuename, queue);
			rabbitadmin.declarequeue(queue);	//声明queue
			needbinding = true;
		}
		if(needbinding) {
			binding binding = bindingbuilder.bind(queue).to(directexchange).with(queuename);//将queue绑定到exchange
			rabbitadmin.declarebinding(binding);//声明绑定关系
			binded.add(bindrelation);
		}
	}
}

搞定,现在可以将defaulteventtemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个po

@suppresswarnings("serial")
public class people implements serializable{
	private int id;
	private string name;
	private boolean male;
	private people spouse;
	private list<people> friends;
	public int getid() {
		return id;
	}
	public void setid(int id) {
		this.id = id;
	}
	public string getname() {
		return name;
	}
	public void setname(string name) {
		this.name = name;
	}
	public boolean ismale() {
		return male;
	}
	public void setmale(boolean male) {
		this.male = male;
	}
	public people getspouse() {
		return spouse;
	}
	public void setspouse(people spouse) {
		this.spouse = spouse;
	}
	public list<people> getfriends() {
		return friends;
	}
	public void setfriends(list<people> friends) {
		this.friends = friends;
	}
	@override
	public string tostring() {
		// todo auto-generated method stub
		return "people[id="+id+",name="+name+",male="+male+"]";
	}
}

建立单元测试

public class rabbitmqtest{
	private string defaulthost = "127.0.0.1";
	private string defaultexchange = "exchange_direct_test";
	private string defaultqueue = "queue_test";
	private defaulteventcontroller controller;
	private eventtemplate eventtemplate;
	@before
	public void init() throws ioexception{
		eventcontrolconfig config = new eventcontrolconfig(defaulthost);
		controller = defaulteventcontroller.getinstance(config);
		eventtemplate = controller.geteopeventtemplate();
		controller.add(defaultqueue, defaultexchange, new apiprocesseventprocessor());
		controller.start();
	}
	@test
	public void sendstring() throws sendrefuseexception{
		eventtemplate.send(defaultqueue, defaultexchange, "hello world");
	}	
	@test
	public void sendobject() throws sendrefuseexception{
		eventtemplate.send(defaultqueue, defaultexchange, mockobj());
	}	
	@test
	public void sendtemp() throws sendrefuseexception, interruptedexception{
		string tempexchange = "exchange_direct_test_temp";//以前未声明的exchange
		string tempqueue = "queue_test_temp";//以前未声明的queue
		eventtemplate.send(tempqueue, tempexchange, mockobj());
		//发送成功后此时不会接受到消息,还需要绑定对应的消费程序
		controller.add(tempqueue, tempexchange, new apiprocesseventprocessor());
	}	
	@after
	public void end() throws interruptedexception{
		thread.sleep(2000);
	}	
	private people mockobj(){
		people jack = new people();
		jack.setid(1);
		jack.setname("jack");
		jack.setmale(true);
		
		list<people> friends = new arraylist<>();
		friends.add(jack);
		people hanmeimei = new people();
		hanmeimei.setid(1);
		hanmeimei.setname("韩梅梅");
		hanmeimei.setmale(false);
		hanmeimei.setfriends(friends);
		
		people lilei = new people();
		lilei.setid(2);
		lilei.setname("李雷");
		lilei.setmale(true);
		lilei.setfriends(friends);
		lilei.setspouse(hanmeimei);
		hanmeimei.setspouse(lilei);
		return hanmeimei;
	}
	class apiprocesseventprocessor implements eventprocesser{
		@override
		public void process(object e) {//消费程序这里只是打印信息
			assert.assertnotnull(e);
			system.out.println(e);
			if(e instanceof people){
				people people = (people)e;
				system.out.println(people.getspouse());
				system.out.println(people.getfriends());
			}
		}
	}
}

源码地址请点击

总结

以上就是本文关于java实现rabbitmq消息的发送接受的全部内容,希望对大家有所帮助。

感谢大家对本站的支持。