Camel中Splitter和Aggregator的使用 博客分类: Camel OSGIcamelservicemix
最近公司要用camel来切分字符串,然后聚合起来,判断是否处理完成。容器是用的Servicemix。
blueprint.xml
<?xml version="1.0" encoding="UTF-8"?> <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd" xmlns:ns2="null" xmlns:ns3="null" xmlns:ns4="null" xmlns:ns5="null" xmlns:ns6="null" xmlns:ns7="null"> <bean id="Aggregated" class="com.bosame.TicketCenter.Application.OrderProcedure.Aggregated"> </bean> <bean id="aggregatorStrategy" class="com.bosame.TicketCenter.Application.OrderProcedure.StringAggregationStrategy"/> <bean id="OrderSplitBean" class="com.bosame.TicketCenter.Application.OrderProcedure.OrderSplit"></bean> <camelContext id="OrderCamel" xmlns="http://camel.apache.org/schema/blueprint" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd"> <route> <from uri="vm:ReceiveBatchOrder"/> <setHeader headerName="gid"> <simple>${date:now:HH_mm_ss}</simple> </setHeader> <split> <method bean="OrderSplitBean" method="splitOrders"/> <setHeader headerName="gid"> <simple>${in.header.gid}</simple> </setHeader> <to uri="vm:ReceiveOrder"/> </split> </route> <route> <from uri="vm:ReceiveOrder"/> <to uri="bean:OrderVerifierBean" /> <aggregate strategyRef="aggregatorStrategy" eagerCheckCompletion="true" completionTimeout="3000"> <correlationExpression> <header>gid</header> </correlationExpression> <completionSize> <header>CamelSplitSize</header> </completionSize> <to uri="bean:Aggregated"/> </aggregate> </route> </camelContext> </blueprint>
这里切分的时候是使用的一个bean对象来切分的,也可以使用正则表达式或者字符串。
就是把"<method bean="OrderSplitBean" method="splitOrders"/>"这个改成"<tokenize token="@"/>",切分的时候就会以"@"来切分。
加入<setHeader headerName="gid">
<simple>${date:now:HH_mm_ss}</simple>
</setHeader>
这行代码是为了方便聚合的时候使用,因为camel聚合的时候是根据头部相同的来聚合的。
<correlationExpression>
<header>gid</header>
</correlationExpression>
这个是表示聚合条件,以头部相同的来聚合。
<completionSize>
<header>CamelSplitSize</header>
</completionSize>
这个是聚合完成条件。以切分的大小为完成条件。CamelSplitSize这个参数在camel的2.9以后已经加入到了camel的头部header 里面。详细请参考链接http://camel.apache.org/splitter.html
completionTimeout="3000"这个参数是超时时间,当3秒钟还没聚合完时,就算超时。
strategyRef="aggregatorStrategy" 这个是聚合的策略,要自己编写一个类继承AggregationStrategy的。
里面就写要怎样聚合,我这里是简单把2个值相加。
<to uri="bean:Aggregated"/>这个是最后聚合完成了,要跳转的bean。可以自己随便写。
OrderSplit.java
package com.TicketCenter.Application.OrderProcedure; import java.util.List; /** * 切分订单 * @author lym */ public class OrderSplit { public List<String> splitOrders(List<String> orderStrList) { return orderStrList; } }
StringAggregationStrategy.java
package com.TicketCenter.Application.OrderProcedure; import org.apache.camel.Exchange; import org.apache.camel.processor.aggregate.AggregationStrategy; /** * * @author Administrator */ public class StringAggregationStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } oldExchange.getIn().setBody(newExchange.getIn().getBody(Order.class)); return oldExchange; } }