Spring Boot集成Java DSL的实现代码
spring integration java dsl已经融合到spring integration core 5.0,这是一个聪明而明显的举动,因为:
- 基于java config启动新spring项目的每个人都使用它
- si java dsl使您可以使用lambdas等新的强大java 8功能
- 您可以使用 基于integrationflowbuilder的builder模式构建流
让我们看看基于activemq jms的示例如何使用它。
maven依赖:
<dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-activemq</artifactid> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-core</artifactid> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-jms</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <dependency> <groupid>org.apache.activemq</groupid> <artifactid>activemq-kahadb-store</artifactid> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl --> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-java-dsl</artifactid> <version>1.2.3.release</version> </dependency> </dependencies>
示例1:jms入站网关
我们有以下serviceactivator:
@service public class activemqendpoint { @serviceactivator(inputchannel = "inboundchannel") public void processmessage(final string inboundpayload) { system.out.println("inbound message: "+inboundpayload); } }
如果您想使用si java dsl 将inboundpayload从jms队列发送到gateway风格的激活器,那么请使用dsljms工厂:
@bean public dynamicdestinationresolver dynamicdestinationresolver() { return new dynamicdestinationresolver(); } @bean public activemqconnectionfactory connectionfactory() { return new activemqconnectionfactory(); } @bean public defaultmessagelistenercontainer listenercontainer() { final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer(); defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver()); defaultmessagelistenercontainer.setconnectionfactory(connectionfactory()); defaultmessagelistenercontainer.setdestinationname("jms.activemq.test"); return defaultmessagelistenercontainer; } @bean public messagechannel inboundchannel() { return messagechannels.direct("inboundchannel").get(); } @bean public jmsinboundgateway dataendpoint() { return jms.inboundgateway(listenercontainer()) .requestchannel(inboundchannel()).get(); }
通过dataendpoint bean 返回jmsinboundgatewayspec,您还可以向si通道或jms目标发送回复。查看文档。
示例2:jms消息驱动的通道适配器
如果您正在寻找替换消息驱动通道适配器的xml jms配置,那么jmsmessagedrivenchanneladapter是一种适合您的方式:
@bean public dynamicdestinationresolver dynamicdestinationresolver() { return new dynamicdestinationresolver(); } @bean public activemqconnectionfactory connectionfactory() { return new activemqconnectionfactory(); } @bean public defaultmessagelistenercontainer listenercontainer() { final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer(); defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver()); defaultmessagelistenercontainer.setconnectionfactory(connectionfactory()); defaultmessagelistenercontainer.setdestinationname("jms.activemq.test"); return defaultmessagelistenercontainer; } @bean public messagechannel inboundchannel() { return messagechannels.direct("inboundchannel").get(); } @bean public jmsmessagedrivenchanneladapter dataendpoint() { final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener = new channelpublishingjmsmessagelistener(); channelpublishingjmsmessagelistener.setexpectreply(false); final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener ); messagedrivenchanneladapter.setoutputchannel(inboundchannel()); return messagedrivenchanneladapter; }
与前面的示例一样,入站有效负载如样本1中一样发送给激活器。
示例3:使用jaxb的jms消息驱动的通道适配器
在典型的场景中,您希望通过jms接受xml作为文本消息,将其转换为jaxb存根并在服务激活器中处理它。我将向您展示如何使用si java dsl执行此操作,但首先让我们为xml处理添加两个依赖项:
<dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-xml</artifactid> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-oxm</artifactid> </dependency>
我们将通过jms接受shiporders ,所以首先xsd命名为shiporder.xsd:
<?xml version="1.0" encoding="utf-8" ?> <xs:schema xmlns:xs="http://www.w3.org/2001/xmlschema"> <xs:element name="shiporder"> <xs:complextype> <xs:sequence> <xs:element name="orderperson" type="xs:string"/> <xs:element name="shipto"> <xs:complextype> <xs:sequence> <xs:element name="name" type="xs:string"/> <xs:element name="address" type="xs:string"/> <xs:element name="city" type="xs:string"/> <xs:element name="country" type="xs:string"/> </xs:sequence> </xs:complextype> </xs:element> <xs:element name="item" maxoccurs="unbounded"> <xs:complextype> <xs:sequence> <xs:element name="title" type="xs:string"/> <xs:element name="note" type="xs:string" minoccurs="0"/> <xs:element name="quantity" type="xs:positiveinteger"/> <xs:element name="price" type="xs:decimal"/> </xs:sequence> </xs:complextype> </xs:element> </xs:sequence> <xs:attribute name="orderid" type="xs:string" use="required"/> </xs:complextype> </xs:element> </xs:schema>
新增jaxb maven plugin 生成jaxb存根:
<plugin> <groupid>org.codehaus.mojo</groupid> <artifactid>jaxb2-maven-plugin</artifactid> <version>2.3.1</version> <executions> <execution> <id>xjc-schema1</id> <goals> <goal>xjc</goal> </goals> <configuration> <!-- use all xsds under the west directory for sources here. --> <sources> <source>src/main/resources/xsds/shiporder.xsd</source> </sources> <!-- package name of the generated sources. --> <packagename>com.example.stubs</packagename> <outputdirectory>src/main/java</outputdirectory> <clearoutputdir>false</clearoutputdir> </configuration> </execution> </executions> </plugin>
我们已经准备好了存根类和一切,现在使用jaxb magic的java dsl jms消息驱动适配器:
/** * sample 3: jms message driven adapter with jaxb */ @bean public jmsmessagedrivenchanneladapter dataendpoint() { final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener = new channelpublishingjmsmessagelistener(); channelpublishingjmsmessagelistener.setexpectreply(false); channelpublishingjmsmessagelistener.setmessageconverter(new marshallingmessageconverter(shipordersmarshaller())); final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener ); messagedrivenchanneladapter.setoutputchannel(inboundchannel()); return messagedrivenchanneladapter; } @bean public jaxb2marshaller shipordersmarshaller() { jaxb2marshaller marshaller = new jaxb2marshaller(); marshaller.setcontextpath("com.example.stubs"); return marshaller; }
xml配置在java中使用它可以为您提供如此强大的功能和灵活性。要完成此示例,inboundchannel的服务激活器将如下所示:
/** * sample 3 * @param shiporder */ @serviceactivator(inputchannel = "inboundchannel") public void processmessage(final shiporder shiporder) { system.out.println(shiporder.getorderid()); system.out.println(shiporder.getorderperson()); }
要测试流,您可以使用以下xml通过jconsole发送到jms队列:
<?xml version="1.0" encoding="utf-8"?> <shiporder orderid="889923" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:nonamespaceschemalocation="shiporder.xsd"> <orderperson>john smith</orderperson> <shipto> <name>ola nordmann</name> <address>langgt 23</address> <city>4000 stavanger</city> <country>norway</country> </shipto> <item> <title>empire burlesque</title> <note>special edition</note> <quantity>1</quantity> <price>10.90</price> </item> <item> <title>hide your heart</title> <quantity>1</quantity> <price>9.90</price> </item> </shiporder>
示例4:具有jaxb和有效负载根路由的jms消息驱动的通道适配器
另一种典型情况是接受xml作为jms文本消息,将其转换为jaxb存根并根据有效负载根类型将有效负载路由到某个服务激活器。当然si java dsl支持所有类型的路由,我将向您展示如何根据有效载荷类型进行路由。
首先,将以下xsd添加到shiporder.xsd所在的文件夹中,并将其命名为purchaseorder.xsd:
<xsd:schema xmlns:xsd="http://www.w3.org/2001/xmlschema" xmlns:tns="http://tempuri.org/purchaseorderschema.xsd" targetnamespace="http://tempuri.org/purchaseorderschema.xsd" elementformdefault="qualified"> <xsd:element name="purchaseorder"> <xsd:complextype> <xsd:sequence> <xsd:element name="shipto" type="tns:usaddress" maxoccurs="2"/> <xsd:element name="billto" type="tns:usaddress"/> </xsd:sequence> <xsd:attribute name="orderdate" type="xsd:date"/> </xsd:complextype> </xsd:element> <xsd:complextype name="usaddress"> <xsd:sequence> <xsd:element name="name" type="xsd:string"/> <xsd:element name="street" type="xsd:string"/> <xsd:element name="city" type="xsd:string"/> <xsd:element name="state" type="xsd:string"/> <xsd:element name="zip" type="xsd:integer"/> </xsd:sequence> <xsd:attribute name="country" type="xsd:nmtoken" fixed="us"/> </xsd:complextype> </xsd:schema>
然后添加到jaxb maven插件配置:
<plugin> <groupid>org.codehaus.mojo</groupid> <artifactid>jaxb2-maven-plugin</artifactid> <version>2.3.1</version> <executions> <execution> <id>xjc-schema1</id> <goals> <goal>xjc</goal> </goals> <configuration> <!-- use all xsds under the west directory for sources here. --> <sources> <source>src/main/resources/xsds/shiporder.xsd</source> <source>src/main/resources/xsds/purchaseorder.xsd</source> </sources> <!-- package name of the generated sources. --> <packagename>com.example.stubs</packagename> <outputdirectory>src/main/java</outputdirectory> <clearoutputdir>false</clearoutputdir> </configuration> </execution> </executions> </plugin>
运行mvn clean install以生成新xsd的jaxb存根。现在承诺有效负载根映射:
@bean public jaxb2marshaller ordersmarshaller() { jaxb2marshaller marshaller = new jaxb2marshaller(); marshaller.setcontextpath("com.example.stubs"); return marshaller; } /** * sample 4: jms message driven adapter with jaxb and payload routing. * @return */ @bean public jmsmessagedrivenchanneladapter dataendpoint() { final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener = new channelpublishingjmsmessagelistener(); channelpublishingjmsmessagelistener.setmessageconverter(new marshallingmessageconverter(ordersmarshaller())); final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener ); messagedrivenchanneladapter.setoutputchannel(inboundchannel()); return messagedrivenchanneladapter; } @bean public integrationflow payloadrootmapping() { return integrationflows.from(inboundchannel()).<object, class<?>>route(object::getclass, m->m .subflowmapping(shiporder.class, sf->sf.handle((messagehandler) message -> { final shiporder shiporder = (shiporder) message.getpayload(); system.out.println(shiporder.getorderperson()); system.out.println(shiporder.getorderid()); })) .subflowmapping(purchaseorder.class, sf->sf.handle((messagehandler) message -> { final purchaseorder purchaseordertype = (purchaseorder) message.getpayload(); system.out.println(purchaseordertype.getbillto().getname()); })) ).get(); }
注意payloadrootmapping bean,让我们解释一下重要的部分:
- <object, class<?>> route - 表示来自inboundchannel的输入将是object,并且将根据class <?>执行路由
- subflowmapping(shiporder.class.. - shipoders的处理。
- subflowmapping(purchaseorder.class ... - 处理purchaseorders。
要测试shiporder有效负载,请使用示例3中的xml,以测试purchaseorder有效负载,使用以下xml:
<?xml version="1.0" encoding="utf-8"?> <purchaseorder orderdate="1900-01-01" xmlns="http://tempuri.org/purchaseorderschema.xsd"> <shipto country="us"> <name>name1</name> <street>street1</street> <city>city1</city> <state>state1</state> <zip>1</zip> </shipto> <shipto country="us"> <name>name2</name> <street>street2</street> <city>city2</city> <state>state2</state> <zip>-79228162514264337593543950335</zip> </shipto> <billto country="us"> <name>name1</name> <street>street1</street> <city>city1</city> <state>state1</state> <zip>1</zip> </billto> </purchaseorder>
应根据subflow 子流map路由两个有效载荷。
示例5:integrationflowadapter
除了企业集成模式的其他实现(check them out)),我需要提到integrationflowadapter。通过扩展此类并实现buildflow方法,如:
[url=https://bitbucket.org/component/]@component[/url] public class myflowadapter extends integrationflowadapter { @autowired private connectionfactory rabbitconnectionfactory; @override protected integrationflowdefinition<?> buildflow() { return from(amqp.inboundadapter(this.rabbitconnectionfactory, "myqueue")) .<string, string>transform(string::tolowercase) .channel(c -> c.queue("myflowadapteroutput")); }
你可以将bean的重复声明包装成一个组件并给它们所需的流量。然后可以配置这样的组件并将其作为一个类实例提供给调用代码!
因此,让我们举例说明这个repo中的示例3更短一些,并为所有jmsendpoints定义基类,并在其中定义重复bean:
public class jmsendpoint extends integrationflowadapter { private string queuename; private string channelname; private string contextpath; /** * @param queuename * @param channelname * @param contextpath */ public jmsendpoint(string queuename, string channelname, string contextpath) { this.queuename = queuename; this.channelname = channelname; this.contextpath = contextpath; } @override protected integrationflowdefinition<?> buildflow() { return from(jms.messagedrivenchanneladapter(listenercontainer()) .jmsmessageconverter(new marshallingmessageconverter(shipordersmarshaller())) ).channel(channelname); } @bean public jaxb2marshaller shipordersmarshaller() { jaxb2marshaller marshaller = new jaxb2marshaller(); marshaller.setcontextpath(contextpath); return marshaller; } @bean public dynamicdestinationresolver dynamicdestinationresolver() { return new dynamicdestinationresolver(); } @bean public activemqconnectionfactory connectionfactory() { return new activemqconnectionfactory(); } @bean public defaultmessagelistenercontainer listenercontainer() { final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer(); defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver()); defaultmessagelistenercontainer.setconnectionfactory(connectionfactory()); defaultmessagelistenercontainer.setdestinationname(queuename); return defaultmessagelistenercontainer; } @bean public messagechannel inboundchannel() { return messagechannels.direct(channelname).get(); } }
现在声明特定队列的jms端点很容易:
@bean public jmsendpoint jmsendpoint() { return new jmsendpoint("jms.activemq.test", "inboundchannel", "com.example.stubs"); }
inboundchannel的服务激活器:
/** * sample 3, 5 * @param shiporder */ @serviceactivator(inputchannel = "inboundchannel") public void processmessage(final shiporder shiporder) { system.out.println(shiporder.getorderid()); system.out.println(shiporder.getorderperson()); }
您不应该错过在项目中使用integrationflowadapter。我喜欢它的概念。
我最近在embedit的新的基于spring boot的项目中开始使用spring integration java dsl 。即使有一些配置,我发现它非常有用。
- 它很容易调试。不添加像wiretap这样的配置。
- 阅读起来要容易得多。是的,即使是lambdas!
- 它很强大。在java配置中,您现在有很多选择。
源码地址:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
推荐阅读
-
Spring Boot集成Java DSL的实现代码
-
java使用spring实现读写分离的示例代码
-
Mybatis集成Spring的实例代码_动力节点Java 学院整理
-
Spring Boot集成MyBatis实现通用Mapper的配置及使用
-
spring boot+自定义 AOP 实现全局校验的实例代码
-
Spring boot + LayIM + t-io 实现文件上传、 监听用户状态的实例代码
-
Spring boot集成spring session实现session共享的方法
-
Spring boot + mybatis + Vue.js + ElementUI 实现数据的增删改查实例代码(二)
-
Spring boot + mybatis + Vue.js + ElementUI 实现数据的增删改查实例代码(一)
-
Spring Boot Hello World的实现代码