源码解读Spring-Integration执行过程
一,前言
spring-integration基于spring,在应用程序中启用了轻量级消息传递,并支持通过声明式适配器与外部系统集成。这一段官网的介绍,概况了整个integration的用途。个人感觉消息传递是真正的重点。
如上图所示,典型的生产者-消费者模式,中间通过一个特定的通道进行数据传输,说到这,是不是隐隐感觉到queue的存在。确实事实上这个所谓的通道默认就是用的 blockingqueue。
spring-integration网上的资料是真少,再加上源码分析的是更少。关于spring-integration的基本介绍直接去官网上看更加的直观,这边就不累述了。
今天主要是看个简单的hello word进来分析下整个执行过程。
先看下代码:
<?xml version="1.0" encoding="utf-8"?> <beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemalocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd"> <annotation-config/> <channel id="oc" > <queue/> </channel> <beans:bean id="beans" class="com.example.demo.beans"/> </beans:beans>
@configuration public class beans { @serviceactivator(inputchannel = "ic", outputchannel = "oc") public string sayhello(string name) { return "hello " + name; } }
public class helloworlddemo { @test public void testdemo() throws exception { classpathxmlapplicationcontext context = new classpathxmlapplicationcontext("/demo.xml", helloworlddemo.class); directchannel inputchannel = context.getbean("ic", directchannel.class); pollablechannel outputchannel = context.getbean("oc", pollablechannel.class); inputchannel.send(new genericmessage<string>("world")); system.out.println("==> helloworlddemo: " + outputchannel.receive(0).getpayload()); context.close(); } } out: ==> helloworlddemo: hello world
二,serviceactivator
上面的代码演示了调用方法的入站通道适配器和标准的出站通道适配器, 它们之间是一个带注解的serviceactivator。关于这个serviceactivator就是一个消息端点。
消息端点的主要作用是以非侵入性方式将应用程序代码连接到消息传递框架。换句话说,理想情况下,应用程序代码应该不知道消息对象或消息管道。这类似于 mvc 范式中controller 的作用。正如controller 处理 http 请求一样,消息端点处理消息。以及controller 映射到 url 模式一样,消息端点映射到消息通道。这两种情况的目标是相同的。
serviceactivator是用于将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,如果要调用的服务方法能够返回值,还可以提供输出消息通道。
具体流程如下图:
上面的代码比较简单,但是或许会发现我们只定义了输出通道oc,输入通道ic竟然没有定义也能正常应用,是不是很奇怪?带着疑问我们先看下serviceactivator的源码:
注释上写的很清楚,如果输入通道不存在,将在应用程序上下文中注册具有此名称的directchannel 。具体在哪定义,我们后面会看到,现在不急,先一步步来看他的执行过程。
我们全局查找serviceactivator,看他是哪边进行处理的,最后发现了messagingannotationpostprocessor类,用来处理方法级消息注解的beanpostprocessor实现。
@override public void afterpropertiesset() { assert.notnull(this.beanfactory, "beanfactory must not be null"); ((beandefinitionregistry) this.beanfactory).registerbeandefinition( integrationcontextutils.disposables_bean_name, beandefinitionbuilder.genericbeandefinition(disposables.class, disposables::new) .getrawbeandefinition()); this.postprocessors.put(filter.class, new filterannotationpostprocessor(this.beanfactory)); this.postprocessors.put(router.class, new routerannotationpostprocessor(this.beanfactory)); this.postprocessors.put(transformer.class, new transformerannotationpostprocessor(this.beanfactory)); this.postprocessors.put(serviceactivator.class, new serviceactivatorannotationpostprocessor(this.beanfactory)); this.postprocessors.put(splitter.class, new splitterannotationpostprocessor(this.beanfactory)); this.postprocessors.put(aggregator.class, new aggregatorannotationpostprocessor(this.beanfactory)); this.postprocessors.put(inboundchanneladapter.class, new inboundchanneladapterannotationpostprocessor(this.beanfactory)); this.postprocessors.put(bridgefrom.class, new bridgefromannotationpostprocessor(this.beanfactory)); this.postprocessors.put(bridgeto.class, new bridgetoannotationpostprocessor(this.beanfactory)); map<class<? extends annotation>, methodannotationpostprocessor<?>> custompostprocessors = setupcustompostprocessors(); if (!collectionutils.isempty(custompostprocessors)) { this.postprocessors.putall(custompostprocessors); } }
在afterpropertiesset方法中,我们看到定义了一个后处理器postprocessors,里面注册了相关的注解处理类。包含各种消息端点处理,除了上面写的serviceactivator,还有过滤器,路由,转换器等各种不同的端点方法。
接着往向下看,既然实现了beanpostprocessor
,那必然要用到postprocessafterinitialization
方法实现,这里的流程大概就是遍历出包含有@serviceactivator的bean方法,用来做后续处理。我们直接看重点的代码。
object result = postprocessor.postprocess(bean, beanname, targetmethod, annotations);
三,postprocess
在abstractmethodannotationpostprocessor
中有个共通方法postprocess用来生成对应的端点信息。具体代码:
@override public object postprocess(object bean, string beanname, method method, list<annotation> annotations) { object sourcehandler = null; if (beanannotationaware() && annotatedelementutils.isannotated(method, bean.class.getname())) { if (!this.beanfactory.containsbeandefinition(resolvetargetbeanname(method))) { this.logger.debug("skipping endpoint creation; perhaps due to some '@conditional' annotation."); return null; } else { sourcehandler = resolvetargetbeanfrommethodwithbeanannotation(method); } } //生成对应的messagehandler,用来执行对应的注解的方法 messagehandler handler = createhandler(bean, method, annotations); if (!(handler instanceof reactivemessagehandleradapter)) { orderable(method, handler); producerorrouter(annotations, handler); if (!handler.equals(sourcehandler)) { handler = registerhandlerbean(beanname, method, handler); } handler = annotated(method, handler); handler = advicechain(beanname, annotations, handler); } //将messagehandler实现连接到消息端点,生成对应的endpoint。 abstractendpoint endpoint = createendpoint(handler, method, annotations); if (endpoint != null) { return endpoint; } else { return handler; } }
这里面主要是两件事:
- 根据模板模式中不同的createhandler抽象方法实现,生成对应的messagehandler。譬如说我们这边的
serviceactivatorannotationpostprocessor
- 将messagehandler实现连接到消息端点,生成对应的endpoint。
1.createhandler
@override protected messagehandler createhandler(object bean, method method, list<annotation> annotations) { abstractreplyproducingmessagehandler serviceactivator; if (annotatedelementutils.isannotated(method, bean.class.getname())) { ... else { serviceactivator = new serviceactivatinghandler(bean, method); } string requiresreply = messagingannotationutils.resolveattribute(annotations, "requiresreply", string.class); if (stringutils.hastext(requiresreply)) { serviceactivator.setrequiresreply(resolveattributetoboolean(requiresreply)); } string isasync = messagingannotationutils.resolveattribute(annotations, "async", string.class); if (stringutils.hastext(isasync)) { serviceactivator.setasync(resolveattributetoboolean(isasync)); } //是否设置了输出通道 setoutputchannelifpresent(annotations, serviceactivator); return serviceactivator; }
createhandler
的代码比较简单,就是根据注解中的几个属性还有对应的方法参数,生成serviceactivatinghandler
。追溯下去serviceactivatinghandler
中最后会生成一个委托对象messagingmethodinvokerhelper
用来以反射的方式来执行目标方法。
2.createendpoint
createendpoint
字面上都能知道是生成消息端点,事实上也是,把生成的handler和对应的管道进行关联。具体看下代码体会:
protected abstractendpoint createendpoint(messagehandler handler, @suppresswarnings("unused") method method, list<annotation> annotations) { abstractendpoint endpoint = null; //取得注解中inputchannelname string inputchannelname = messagingannotationutils.resolveattribute(annotations, getinputchannelattribute(), string.class); if (stringutils.hastext(inputchannelname)) { messagechannel inputchannel; try { //从beanfactory中取得对应的通道bean inputchannel = this.channelresolver.resolvedestination(inputchannelname); } catch (destinationresolutionexception e) { //取不到,则自动注册一个类型为directchannel的inputchannel if (e.getcause() instanceof nosuchbeandefinitionexception) { inputchannel = new directchannel(); this.beanfactory.registersingleton(inputchannelname, inputchannel); inputchannel = (messagechannel) this.beanfactory.initializebean(inputchannel, inputchannelname); if (this.disposables != null) { this.disposables.add((disposablebean) inputchannel); } } else { throw e; } } assert.notnull(inputchannel, () -> "failed to resolve inputchannel '" + inputchannelname + "'"); //生成endpoint endpoint = docreateendpoint(handler, inputchannel, annotations); } return endpoint; }
上面的代码中,我们就能清楚的看到为什么我们在demo中没有注册输入通道也能正常应用的原因了,从而回答之前的疑问。
protected abstractendpoint docreateendpoint(messagehandler handler, messagechannel inputchannel, list<annotation> annotations) { .... else if (inputchannel instanceof subscribablechannel) { //生成subscribablechannel类型对应的执行端点 return new eventdrivenconsumer((subscribablechannel) inputchannel, handler); } else if (inputchannel instanceof pollablechannel) { return pollingconsumer(inputchannel, handler, pollers); } else { throw new illegalargumentexception("unsupported 'inputchannel' type: '" + inputchannel.getclass().getname() + "'. " + "must be one of 'subscribablechannel', 'pollablechannel' or 'reactivestreamssubscribablechannel'"); } }
通道类型一共有两种,一种是发布订阅,一种是可轮询的,我们是默认是走的第一种,因为directchannel默认就是个subscribablechannel。所以最终我们生成了对应的信息端点类eventdrivenconsumer。
我们先看下eventdrivenconsumer整体结构:
eventdrivenconsumer上面有一个抽象类abstractendpoint
,最上面实现了lifecycle
接口,所以生命周期跟着容器走,我们直接跳到star方法看:
@override protected void dostart() { this.logcomponentsubscriptionevent(true); //把handler和inputchannel进行绑定 this.inputchannel.subscribe(this.handler); if (this.handler instanceof lifecycle) { ((lifecycle) this.handler).start(); } }
@override public synchronized boolean addhandler(messagehandler handler) { assert.notnull(handler, "handler must not be null"); assert.istrue(this.handlers.size() < this.maxsubscribers, "maximum subscribers exceeded"); boolean added = this.handlers.add(handler); if (this.handlers.size() == 1) { this.theonehandler = handler; } else { this.theonehandler = null; } return added; }
上面的代码主要就是把handler注册到inputchannel中,这样只要inputchannel通道一收到信息,就会通知他注册的handlers进行处理。代码中比较清楚的记录了一切的操作,就不多解释了。
四,发送信息
执行完上面一系列的注册,已经把这一些的通道打通了,剩下的就是真正的发送操作了。下面分析下inputchannel.send(new genericmessage<string>("world"));
看看send操作:
/** * 在此频道上发送消息。 如果通道已满,则此方法将阻塞,直到发生超时或发送线程中断。 如果指定的超时时间为 0,则该方法将立即返回。 如果小于零,它将无限期阻塞(请参阅send(message) )。 * 参数: * messagearg – 要发送的消息 * timeout - 以毫秒为单位的超时时间 * 返回: * true如果消息发送成功, false如果消息无法在规定时间内发送或发送线程被中断 */ @override public boolean send(message<?> messagearg, long timeout) { ... try { //message是否需要转换 message = convertpayloadifnecessary(message); //发送前拦截器 if (interceptorlist.getsize() > 0) { interceptorstack = new arraydeque<>(); message = interceptorlist.presend(message, this, interceptorstack); if (message == null) { return false; } } if (this.metricscaptor != null) { sample = this.metricscaptor.start(); } //发送操作 sent = dosend(message, timeout); if (sample != null) { sample.stop(sendtimer(sent)); } metricsprocessed = true; if (debugenabled) { logger.debug("postsend (sent=" + sent + ") on channel '" + this + "', message: " + message); } //发送后拦截器 if (interceptorstack != null) { interceptorlist.postsend(message, this, sent); interceptorlist.aftersendcompletion(message, this, sent, null, interceptorstack); } return sent; } catch (exception ex) { ... } }
真正的send操作跟下去,会发现层次极深,碍于篇幅,我们直接跟到重点代码:
@override protected final void handlemessageinternal(message<?> message) { object result; if (this.advisedrequesthandler == null) { //反射执行对应的端点方法 result = handlerequestmessage(message); } else { result = doinvokeadvisedrequesthandler(message); } if (result != null) { //往outputchannel发送执行结果 sendoutputs(result, message); } ... }
handlerequestmessage的操作就是用之前我们handler中的委托类messagingmethodinvokerhelper
去反射运行对应的端点方法,然后把执行结果发送outputchannel。最后我们直接定位到具体的发送操作:
@override protected boolean dosend(message<?> message, long timeout) { assert.notnull(message, "'message' must not be null"); try { if (this.queue instanceof blockingqueue) { blockingqueue<message<?>> blockingqueue = (blockingqueue<message<?>>) this.queue; if (timeout > 0) { return blockingqueue.offer(message, timeout, timeunit.milliseconds); } if (timeout == 0) { return blockingqueue.offer(message); } blockingqueue.put(message); return true; } else { try { return this.queue.offer(message); } finally { this.queuesemaphore.release(); } } } catch (interruptedexception e) { thread.currentthread().interrupt(); return false; } }
看到这,我们就明白了数据的去向,存储在队列里了,生产者产生的数据就已经生成了,所以发送的操作基本上就告一段落了。
五,接收信息
数据已经生成,后面就是看如何消费操作了,下面分析下 outputchannel.receive(0).getpayload()
操作:
/** * 从该通道接收第一条可用消息。 如果通道不包含任何消息,则此方法将阻塞,直到分配的超时时间过去。 如果指定的超时时间为 0,则该方法将立即返回。 如果小于零,它将无限期阻塞(参见receive() )。 * 参数: * timeout - 以毫秒为单位的超时时间 * 返回: * 如果在分配的时间内没有可用的消息或接收线程被中断,则为第一个可用消息或null 。 */ @override // nosonar complexity @nullable public message<?> receive(long timeout) { ... try { //接受前拦截器操作 if (interceptorlist.getsize() > 0) { interceptorstack = new arraydeque<>(); //一旦调用接收并在实际检索消息之前调用 if (!interceptorlist.prereceive(this, interceptorstack)) { return null; } } //接收操作 message<?> message = doreceive(timeout); ... //在检索到 message 之后但在将其返回给调用者之前立即调用。 必要时可以修改消息 if (interceptorstack != null && message != null) { message = interceptorlist.postreceive(message, this); } //在接收完成后调用,而不管已引发的任何异常,从而允许适当的资源清理 interceptorlist.afterreceivecompletion(message, this, null, interceptorstack); return message; } catch (runtimeexception ex) { ... } }
最后的doreceive操作,其实大家都心知肚明了,就是从上面的队列中直接读取数据,代码比较简单,就不注释了:
@override @nullable protected message<?> doreceive(long timeout) { try { if (timeout > 0) { if (this.queue instanceof blockingqueue) { return ((blockingqueue<message<?>>) this.queue).poll(timeout, timeunit.milliseconds); } else { return pollnonblockingqueue(timeout); } } if (timeout == 0) { return this.queue.poll(); } if (this.queue instanceof blockingqueue) { return ((blockingqueue<message<?>>) this.queue).take(); } else { message<?> message = this.queue.poll(); while (message == null) { this.queuesemaphore.tryacquire(50, timeunit.milliseconds); // nosonar ok to ignore result message = this.queue.poll(); } return message; } } catch (interruptedexception e) { thread.currentthread().interrupt(); return null; } }
六,结语
能坚持看到这的,基本上都是勇士了。这一系列的执行过程其实还是比较绕的,我估计有些人看得也是云里雾里。其实我已经尽量精简了许多,spring-integration其实涉及到的应用分支更多,我这也只是十分基础的东西,我只能把我自己知道的先记录下来。如果让你对spring-integration产生了兴趣,那本文的目的就达到了。这需要你自己去实地操作研究下,总是有收获的。
以上就是源码简析spring-integration执行流程的详细内容,更多关于spring integration执行的资料请关注其它相关文章!
推荐阅读
-
Mybaits 源码解析 (六)----- 全网最详细:Select 语句的执行过程分析(上篇)(Mapper方法是如何调用到XML中的SQL的?)
-
jQuery选择器源码解读(五):tokenize的解析过程
-
Presto阅读源码Sql执行过程的记录
-
c#源码的执行过程
-
Laravel5.5源码详解 -- 一次查询的详细执行:从Auth-Login-web中间件到数据库查询结果的全过程
-
C/C++从源码到可执行程序的过程讲解
-
Flink中TaskManager端执行用户逻辑过程(源码分析)
-
Mybaits 源码解析 (七)----- Select 语句的执行过程分析(下篇)全网最详细,没有之一
-
源码解读Spring-Integration执行过程
-
jQuery选择器源码解读(五):tokenize的解析过程