浅谈使用java实现阿里云消息队列简单封装
一、前言
最近公司有使用阿里云消息队列的需求,为了更加方便使用,本人用了几天时间将消息队列封装成api调用方式以方便内部系统的调用,现在已经完成,特此记录其中过程和使用到的相关技术,与君共勉。
现在阿里云提供了两种消息服务:mns服务和ons服务,其中我认为mns是简化版的ons,而且mns的消息消费需要自定义轮询策略的,相比之下,ons的发布与订阅模式功能更加强大(比如相对于mns,ons提供了消息追踪、日志、监控等功能),其api使用起来更加方便,而且听闻阿里内部以后不再对mns进行新的开发,只做维护,ons服务则会逐步替代mns服务成为阿里消息服务的主打产品,所以,如果有使用消息队列的需求,建议不要再使用mns,使用ons是最好的选择。
涉及到的技术:spring,反射、动态代理、jackson序列化和反序列化
在看下面的文章之前,需要先看上面的文档以了解相关概念(topic、consumer、producer、tag等)以及文档中提供的简单的发送和接收代码实现。
该博文只针对有消息队列知识基础的朋友看,能帮上大家的忙我自然很高兴,看不懂的也不要骂,说明你路子不对。
二、设计方案
1.消息发送
在一个简单的cs架构中,假设server会监听一个topic的producer发送的消息,那么它首先应该提供client一个api,client只需要简单的调用该api,就可以通过producer来生产消息
2.消息接收
由于api是server制定的,所以server当然也知道如何消费这些消息
在这个过程中,server实际充当着消费者的角色,client实际充当着生产者的角色,但是生产者生产消息的规则则由消费者制定以满足消费者消费需求。
3.最终目标
我们要创建一个单独的jar包,起名为queue-core为生产者和消费者提供依赖和发布订阅的具体实现。
三、消息发送
1.消费者提供接口
@topic(name="kdyzm",producerid="kdyzm_producer") public interface userqueueresource { @tag("test1") public void handleuserinfo(@body @key("userinfohandler") usermodel user); @tag("test2") public void handleuserinfo1(@body @key("userinfohandler1") usermodel user); }
由于topic和producer之间是n:1的关系,所以这里直接将producerid作为topic的一个属性;tag是一个很关键的过滤条件,消费者通过它进行消息的分类做不同的业务处理,所以,这里使用tag作为路由条件。
2.生产者使用消费者提供的api发送消息
由于消费者只提供了接口给生产者使用,接口是没有办法直接使用的,因为没有办法实例化,这里使用动态代理生成对象,在消费者提供的api中,添加如下config,以方便生产者直接导入config即可使用,这里使用了基于java的spring config,请知悉。
@configuration public class queueconfig { @autowired @bean public userqueueresource userqueueresource() { return queueresourcefactory.createproxyqueueresource(userqueueresource.class); } }
3.queue-core对生产者发送消息的封装
以上1中所有的注解(topic、tag、body 、key)以及2中使用到的queueresourcefactory类都要在queue-core中定义,其中注解的定义只是定义了规则,真正的实现实际上是在queueresourcefactory中
import java.lang.reflect.invocationhandler; import java.lang.reflect.method; import java.lang.reflect.proxy; import org.slf4j.logger; import org.slf4j.loggerfactory; import com.aliyun.openservices.ons.api.message; import com.aliyun.openservices.ons.api.producer; import com.aliyun.openservices.ons.api.sendresult; import com.wy.queue.core.api.mqconnection; import com.wy.queue.core.utils.jacksonserializer; import com.wy.queue.core.utils.mqutils; import com.wy.queue.core.utils.queuecorespringutils; public class queueresourcefactory implements invocationhandler { private static final logger logger=loggerfactory.getlogger(queueresourcefactory.class); private string topicname; private string producerid; private jacksonserializer serializer=new jacksonserializer(); private static final string prefix="pid_"; public queueresourcefactory(string topicname,string producerid) { this.topicname = topicname; this.producerid=producerid; } public static <t> t createproxyqueueresource(class<t> clazz) { string topicname = mqutils.gettopicname(clazz); string producerid = mqutils.getproducerid(clazz); t target = (t) proxy.newproxyinstance(queueresourcefactory.class.getclassloader(), new class<?>[] { clazz }, new queueresourcefactory(topicname,producerid)); return target; } @override public object invoke(object proxy, method method, object[] args) throws throwable { if(args.length == 0 || args.length>1){ throw new runtimeexception("only accept one param at queueresource interface."); } string tagname=mqutils.gettagname(method); producerfactory producerfactory = queuecorespringutils.getbean(producerfactory.class); mqconnection connectioninfo = queuecorespringutils.getbean(mqconnection.class); producer producer = producerfactory.createproducer(prefix+connectioninfo.getprefix()+"_"+producerid); //发送消息 message msg = new message( // // 在控制台创建的 topic,即该消息所属的 topic 名称 connectioninfo.getprefix()+"_"+topicname, // message tag, // 可理解为 gmail 中的标签,对消息进行再归类,方便 consumer 指定过滤条件在 mq 服务器过滤 tagname, // message body // 任何二进制形式的数据, mq 不做任何干预, // 需要 producer 与 consumer 协商好一致的序列化和反序列化方式 serializer.serialize(args[0]).getbytes()); sendresult sendresult = producer.send(msg); logger.info("send message success. message id is: " + sendresult.getmessageid()); return null; } }
这里特意将自定义包和第三方使用的包名都贴过来了,以便于区分。
这里到底做了哪些事情呢?
发送消息的过程就是动态代理创建一个代理对象,该对象调用方法的时候会被拦截,首先解析所有的注解,比如topicname、producerid、tag等关键信息从注解中取出来,然后调用阿里sdk发送消息,过程很简单,但是注意,这里发送消息的时候是分环境的,一般来讲现在企业中会区分qa、staging、product三种环境,其中qa和staging是测试环境,对于消息队列来讲,也是会有三种环境的,但是qa和staging环境往往为了降低成本使用同一个阿里账号,所以创建的topic和productid会放到同一个区域下,这样同名的topicname是不允许存在的,所以加上了环境前缀加以区分,比如qa_topicname,pid_staging_producerid等等;另外,queue-core提供了mqconnection接口,以获取配置信息,生产者服务只需要实现该接口即可。
4.生产者发送消息
@autowired private userqueueresource userqueueresource; @override public void sendmessage() { usermodel usermodel=new usermodel(); usermodel.setname("kdyzm"); usermodel.setage(25); userqueueresource.handleuserinfo(usermodel); }
只需要数行代码即可将消息发送到指定的topic,相对于原生的发送代码,精简了太多。
四、消息消费
相对于消息发送,消息的消费要复杂一些。
1.消息消费设计
由于topic和consumer之间是n:n的关系,所以将consumerid放到消费者具体实现的方法上
@controller @queueresource public class userqueueresourceimpl implements userqueueresource { private logger logger = loggerfactory.getlogger(this.getclass()); @override @consumerannotation("kdyzm_consumer") public void handleuserinfo(usermodel user) { logger.info("收到消息1:{}", new gson().tojson(user)); } @override @consumerannotation("kdyzm_consumer1") public void handleuserinfo1(usermodel user) { logger.info("收到消息2:{}", new gson().tojson(user)); } }
这里又有两个新的注解@queueresource和@consumerannotation,这两个注解后续会讨论如何使用。有人会问我为什么要使用consumerannotation这个名字而不使用consumer这个名字,因为consumer这个名字和aliyun提供的sdk中的名字冲突了。。。。
在这里, 消费者提供api 接口给生产者以方便生产者发送消息,消费者则实现该接口以消费生产者发送的消息,如何实现api接口就实现了监听,这点是比较关键的逻辑。
2.queue-core实现消息队列监听核心逻辑
第一步:使用sping 容器的监听方法获取所有加上queueresource注解的bean
第二步:分发处理bean
如何处理这些bean呢,每个bean实际上都是一个对象,有了对象,比如上面例子中的userqueueresourceimpl 对象,我们可以拿到该对象实现的接口字节码对象,进而可以拿到该接口userqueuererousce上的注解以及方法上和方法中的注解,当然userqueueresourceimpl实现方法上的注解也能拿得到,这里我将获取到的信息以consumerid为key,其余相关信息封装为value缓存到了一个map对象中,核心代码如下:
class<?> clazz = resourceimpl.getclass(); class<?> clazzif = clazz.getinterfaces()[0]; method[] methods = clazz.getmethods(); string topicname = mqutils.gettopicname(clazzif); for (method m : methods) { consumerannotation consumeranno = m.getannotation(consumerannotation.class); if (null == consumeranno) { // logger.error("method={} need consumer annotation.", m.getname()); continue; } string consuerid = consumeranno.value(); if (stringutils.isempty(consuerid)) { logger.error("method={} consumerid can't be null", m.getname()); continue; } class<?>[] parametertypes = m.getparametertypes(); method resourceifmethod = null; try { resourceifmethod = clazzif.getmethod(m.getname(), parametertypes); } catch (nosuchmethodexception | securityexception e) { logger.error("can't find method={} at super interface={} .", m.getname(), clazzif.getcanonicalname(), e); continue; } string tagname = mqutils.gettagname(resourceifmethod); consumersmap.put(consuerid, new methodinfo(topicname, tagname, m)); }
第三步:通过反射实现消费的动作
首先,先确定好反射动作执行的时机,那就是监听到了新的消息
其次,如何执行反射动作?不赘述,有反射相关基础的童鞋都知道怎么做,核心代码如下所示:
mqconnection connectioninfo = queuecorespringutils.getbean(mqconnection.class); string topicprefix=connectioninfo.getprefix()+"_"; string consumeridprefix=prefix+connectioninfo.getprefix()+"_"; for(string consumerid:consumersmap.keyset()){ methodinfo methodinfo=consumersmap.get(consumerid); properties connectionproperties=converttoproperties(connectioninfo); // 您在控制台创建的 consumer id connectionproperties.put(propertykeyconst.consumerid, consumeridprefix+consumerid); consumer consumer = onsfactory.createconsumer(connectionproperties); consumer.subscribe(topicprefix+methodinfo.gettopicname(), methodinfo.gettagname(), new messagelistener() { //订阅多个tag public action consume(message message, consumecontext context) { try { string messagebody=new string(message.getbody(),"utf-8"); logger.info("receive message from topic={},tag={},consumerid={},message={}",topicprefix+methodinfo.gettopicname(),methodinfo.gettagname(),consumeridprefix+consumerid,messagebody); method method=methodinfo.getmethod(); class<?> parametype = method.getparametertypes()[0]; object arg = jacksonserializer.deserialize(messagebody, parametype); object[] args={arg}; method.invoke(resourceimpl, args); } catch (exception e) { logger.error("",e); } return action.commitmessage; } }); consumer.start(); logger.info("consumer={} has started.",consumeridprefix+consumerid); }
五、完整代码见下面的git链接
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。