欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot2.0源码分析(二):整合ActiveMQ分析

程序员文章站 2022-07-02 16:35:49
SpringBoot具体整合ActiveMQ可参考: "SpringBoot2.0应用(二):SpringBoot2.0整合ActiveMQ" ActiveMQ自动注入 当项目中存在 和`org.springframework.jms.core.JmsTemplate JmsAutoConfigur ......

springboot具体整合activemq可参考:springboot2.0应用(二):springboot2.0整合activemq

activemq自动注入

当项目中存在javax.jms.messageorg.springframework.jms.core.jmstemplate着两个类时,springboot将activemq需要使用到的对象注册为bean,供项目注入使用。一起看一下jmsautoconfiguration类。

@configuration
@conditionalonclass({ message.class, jmstemplate.class })
@conditionalonbean(connectionfactory.class)
@enableconfigurationproperties(jmsproperties.class)
@import(jmsannotationdrivenconfiguration.class)
public class jmsautoconfiguration {

    @configuration
    protected static class jmstemplateconfiguration {
        ......
        @bean
        @conditionalonmissingbean
        @conditionalonsinglecandidate(connectionfactory.class)
        public jmstemplate jmstemplate(connectionfactory connectionfactory) {
            propertymapper map = propertymapper.get();
            jmstemplate template = new jmstemplate(connectionfactory);
            template.setpubsubdomain(this.properties.ispubsubdomain());
            map.from(this.destinationresolver::getifunique).whennonnull()
                    .to(template::setdestinationresolver);
            map.from(this.messageconverter::getifunique).whennonnull()
                    .to(template::setmessageconverter);
            maptemplateproperties(this.properties.gettemplate(), template);
            return template;
        }
            ......
    }

    @configuration
    @conditionalonclass(jmsmessagingtemplate.class)
    @import(jmstemplateconfiguration.class)
    protected static class messagingtemplateconfiguration {

        @bean
        @conditionalonmissingbean
        @conditionalonsinglecandidate(jmstemplate.class)
        public jmsmessagingtemplate jmsmessagingtemplate(jmstemplate jmstemplate) {
            return new jmsmessagingtemplate(jmstemplate);
        }

    }

}

rabbitautoconfiguration主要注入了jmsmessagingtemplatejmstemplate
rabbitautoconfiguration同时导入了rabbitannotationdrivenconfiguration,注入了jmslistenercontainerfactory

消息发送

以下面的发送为例:

    jmsmessagingtemplate.convertandsend(this.queue, msg);

这个方法会先对消息进行转换,预处理,最终通过调用jmstemplate的dosend实现消息发送的。

    protected void dosend(session session, destination destination, messagecreator messagecreator)
            throws jmsexception {
        assert.notnull(messagecreator, "messagecreator must not be null");
        messageproducer producer = createproducer(session, destination);
        try {
            message message = messagecreator.createmessage(session);
            dosend(producer, message);
            if (session.gettransacted() && issessionlocallytransacted(session)) {
                jmsutils.commitifnecessary(session);
            }
        }
        finally {
            jmsutils.closemessageproducer(producer);
        }
    }

首先创建一个messageproducer的实例,接着将最初的org.springframework.messaging.message转换成javax.jms.message,再将消息委托给producer进行发送。

消息接收

先看一个消费的事例:

@component
public class consumer {
    @jmslistener(destination = "sample.queue")
    public void receivequeue(string text) {
        system.out.println(text);
    }
}

springboot会去解析@jmslistener,具体实现在jmslistenerannotationbeanpostprocessorpostprocessafterinitialization方法。

    public object postprocessafterinitialization(final object bean, string beanname) throws beansexception {
        if (!this.nonannotatedclasses.contains(bean.getclass())) {
            class<?> targetclass = aopproxyutils.ultimatetargetclass(bean);
            map<method, set<jmslistener>> annotatedmethods = methodintrospector.selectmethods(targetclass,
                    (methodintrospector.metadatalookup<set<jmslistener>>) method -> {
                        set<jmslistener> listenermethods = annotatedelementutils.getmergedrepeatableannotations(
                                method, jmslistener.class, jmslisteners.class);
                        return (!listenermethods.isempty() ? listenermethods : null);
                    });
            if (annotatedmethods.isempty()) {
                this.nonannotatedclasses.add(bean.getclass());
            }
            else {
                annotatedmethods.foreach((method, listeners) ->
                        listeners.foreach(listener ->
                                processjmslistener(listener, method, bean)));
            }
        }
        return bean;
    }

springboot根据注解找到了使用了@jmslistener注解的方法,当监听到activemq收到的消息时,会调用对应的方法。来看一下具体怎么进行listener和method的绑定的。

    protected void processjmslistener(jmslistener jmslistener, method mostspecificmethod, object bean) {
        method invocablemethod = aoputils.selectinvocablemethod(mostspecificmethod, bean.getclass());
        methodjmslistenerendpoint endpoint = createmethodjmslistenerendpoint();
        endpoint.setbean(bean);
        endpoint.setmethod(invocablemethod);
        endpoint.setmostspecificmethod(mostspecificmethod);
        endpoint.setmessagehandlermethodfactory(this.messagehandlermethodfactory);
        endpoint.setembeddedvalueresolver(this.embeddedvalueresolver);
        endpoint.setbeanfactory(this.beanfactory);
        endpoint.setid(getendpointid(jmslistener));
        endpoint.setdestination(resolve(jmslistener.destination()));
        if (stringutils.hastext(jmslistener.selector())) {
            endpoint.setselector(resolve(jmslistener.selector()));
        }
        if (stringutils.hastext(jmslistener.subscription())) {
            endpoint.setsubscription(resolve(jmslistener.subscription()));
        }
        if (stringutils.hastext(jmslistener.concurrency())) {
            endpoint.setconcurrency(resolve(jmslistener.concurrency()));
        }

        jmslistenercontainerfactory<?> factory = null;
        string containerfactorybeanname = resolve(jmslistener.containerfactory());
        if (stringutils.hastext(containerfactorybeanname)) {
            assert.state(this.beanfactory != null, "beanfactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanfactory.getbean(containerfactorybeanname, jmslistenercontainerfactory.class);
            }
            catch (nosuchbeandefinitionexception ex) {
                throw new beaninitializationexception("could not register jms listener endpoint on [" +
                        mostspecificmethod + "], no " + jmslistenercontainerfactory.class.getsimplename() +
                        " with id '" + containerfactorybeanname + "' was found in the application context", ex);
            }
        }

        this.registrar.registerendpoint(endpoint, factory);
    }

先设置endpoint的相关属性,再获取jmslistenercontainerfactory,最后将endpoint注册到jmslistenercontainerfactory


本篇到此结束,如果读完觉得有收获的话,欢迎点赞、关注、加公众号【贰级天災】,查阅更多精彩历史!!!

SpringBoot2.0源码分析(二):整合ActiveMQ分析