SpringBoot2.0源码分析(二):整合ActiveMQ分析
springboot具体整合activemq可参考:springboot2.0应用(二):springboot2.0整合activemq
activemq自动注入
当项目中存在javax.jms.message
和org.springframework.jms.core.jmstemplate
着两个类时,springboot将activemq需要使用到的对象注册为bean,供项目注入使用。一起看一下jmsautoconfiguration
类。
@configuration @conditionalonclass({ message.class, jmstemplate.class }) @conditionalonbean(connectionfactory.class) @enableconfigurationproperties(jmsproperties.class) @import(jmsannotationdrivenconfiguration.class) public class jmsautoconfiguration { @configuration protected static class jmstemplateconfiguration { ...... @bean @conditionalonmissingbean @conditionalonsinglecandidate(connectionfactory.class) public jmstemplate jmstemplate(connectionfactory connectionfactory) { propertymapper map = propertymapper.get(); jmstemplate template = new jmstemplate(connectionfactory); template.setpubsubdomain(this.properties.ispubsubdomain()); map.from(this.destinationresolver::getifunique).whennonnull() .to(template::setdestinationresolver); map.from(this.messageconverter::getifunique).whennonnull() .to(template::setmessageconverter); maptemplateproperties(this.properties.gettemplate(), template); return template; } ...... } @configuration @conditionalonclass(jmsmessagingtemplate.class) @import(jmstemplateconfiguration.class) protected static class messagingtemplateconfiguration { @bean @conditionalonmissingbean @conditionalonsinglecandidate(jmstemplate.class) public jmsmessagingtemplate jmsmessagingtemplate(jmstemplate jmstemplate) { return new jmsmessagingtemplate(jmstemplate); } } }
rabbitautoconfiguration
主要注入了jmsmessagingtemplate
和jmstemplate
。rabbitautoconfiguration
同时导入了rabbitannotationdrivenconfiguration
,注入了jmslistenercontainerfactory
。
消息发送
以下面的发送为例:
jmsmessagingtemplate.convertandsend(this.queue, msg);
这个方法会先对消息进行转换,预处理,最终通过调用jmstemplate
的dosend实现消息发送的。
protected void dosend(session session, destination destination, messagecreator messagecreator) throws jmsexception { assert.notnull(messagecreator, "messagecreator must not be null"); messageproducer producer = createproducer(session, destination); try { message message = messagecreator.createmessage(session); dosend(producer, message); if (session.gettransacted() && issessionlocallytransacted(session)) { jmsutils.commitifnecessary(session); } } finally { jmsutils.closemessageproducer(producer); } }
首先创建一个messageproducer的实例,接着将最初的org.springframework.messaging.message
转换成javax.jms.message
,再将消息委托给producer进行发送。
消息接收
先看一个消费的事例:
@component public class consumer { @jmslistener(destination = "sample.queue") public void receivequeue(string text) { system.out.println(text); } }
springboot会去解析@jmslistener
,具体实现在jmslistenerannotationbeanpostprocessor
的postprocessafterinitialization
方法。
public object postprocessafterinitialization(final object bean, string beanname) throws beansexception { if (!this.nonannotatedclasses.contains(bean.getclass())) { class<?> targetclass = aopproxyutils.ultimatetargetclass(bean); map<method, set<jmslistener>> annotatedmethods = methodintrospector.selectmethods(targetclass, (methodintrospector.metadatalookup<set<jmslistener>>) method -> { set<jmslistener> listenermethods = annotatedelementutils.getmergedrepeatableannotations( method, jmslistener.class, jmslisteners.class); return (!listenermethods.isempty() ? listenermethods : null); }); if (annotatedmethods.isempty()) { this.nonannotatedclasses.add(bean.getclass()); } else { annotatedmethods.foreach((method, listeners) -> listeners.foreach(listener -> processjmslistener(listener, method, bean))); } } return bean; }
springboot根据注解找到了使用了@jmslistener
注解的方法,当监听到activemq收到的消息时,会调用对应的方法。来看一下具体怎么进行listener和method的绑定的。
protected void processjmslistener(jmslistener jmslistener, method mostspecificmethod, object bean) { method invocablemethod = aoputils.selectinvocablemethod(mostspecificmethod, bean.getclass()); methodjmslistenerendpoint endpoint = createmethodjmslistenerendpoint(); endpoint.setbean(bean); endpoint.setmethod(invocablemethod); endpoint.setmostspecificmethod(mostspecificmethod); endpoint.setmessagehandlermethodfactory(this.messagehandlermethodfactory); endpoint.setembeddedvalueresolver(this.embeddedvalueresolver); endpoint.setbeanfactory(this.beanfactory); endpoint.setid(getendpointid(jmslistener)); endpoint.setdestination(resolve(jmslistener.destination())); if (stringutils.hastext(jmslistener.selector())) { endpoint.setselector(resolve(jmslistener.selector())); } if (stringutils.hastext(jmslistener.subscription())) { endpoint.setsubscription(resolve(jmslistener.subscription())); } if (stringutils.hastext(jmslistener.concurrency())) { endpoint.setconcurrency(resolve(jmslistener.concurrency())); } jmslistenercontainerfactory<?> factory = null; string containerfactorybeanname = resolve(jmslistener.containerfactory()); if (stringutils.hastext(containerfactorybeanname)) { assert.state(this.beanfactory != null, "beanfactory must be set to obtain container factory by bean name"); try { factory = this.beanfactory.getbean(containerfactorybeanname, jmslistenercontainerfactory.class); } catch (nosuchbeandefinitionexception ex) { throw new beaninitializationexception("could not register jms listener endpoint on [" + mostspecificmethod + "], no " + jmslistenercontainerfactory.class.getsimplename() + " with id '" + containerfactorybeanname + "' was found in the application context", ex); } } this.registrar.registerendpoint(endpoint, factory); }
先设置endpoint
的相关属性,再获取jmslistenercontainerfactory
,最后将endpoint
注册到jmslistenercontainerfactory
。
本篇到此结束,如果读完觉得有收获的话,欢迎点赞、关注、加公众号【贰级天災】,查阅更多精彩历史!!!
上一篇: 9.JAVA-抽象类定义
推荐阅读
-
Tomcat源码分析 (九)----- HTTP请求处理过程(二)
-
jQuery 源码分析(二) 入口模块
-
STL源码分析之第二级配置器
-
Netty源码分析之ChannelPipeline(二)—ChannelHandler的添加与删除
-
Mybaits 源码解析 (九)----- 全网最详细,没有之一:一级缓存和二级缓存源码分析
-
Tomcat源码分析 (二)----- Tomcat整体架构及组件
-
netty之NioEventLoopGroup源码分析二
-
二 分析easyswoole源码(启动服务)
-
Tomcat源码分析二:先看看Tomcat的整体架构
-
别翻了,这篇文章绝对让你深刻理解java类的加载以及ClassLoader源码分析【JVM篇二】