欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Camel中Splitter和Aggregator的使用 博客分类: Camel OSGIcamelservicemix

程序员文章站 2024-02-22 14:07:58
...

最近公司要用camel来切分字符串,然后聚合起来,判断是否处理完成。容器是用的Servicemix。

blueprint.xml

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" 
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
           xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd" xmlns:ns2="null" xmlns:ns3="null" xmlns:ns4="null" xmlns:ns5="null" xmlns:ns6="null" xmlns:ns7="null">
    
    <bean id="Aggregated" class="com.bosame.TicketCenter.Application.OrderProcedure.Aggregated">
    </bean>

    <bean id="aggregatorStrategy" class="com.bosame.TicketCenter.Application.OrderProcedure.StringAggregationStrategy"/>

    <bean id="OrderSplitBean" class="com.bosame.TicketCenter.Application.OrderProcedure.OrderSplit"></bean>

    <camelContext id="OrderCamel"   xmlns="http://camel.apache.org/schema/blueprint" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                    xsi:schemaLocation="http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">
        <route>
            <from uri="vm:ReceiveBatchOrder"/>
            <setHeader headerName="gid">
                <simple>${date:now:HH_mm_ss}</simple>
            </setHeader>
            <split>
                <method bean="OrderSplitBean" method="splitOrders"/>
                <setHeader headerName="gid">
                    <simple>${in.header.gid}</simple>
                </setHeader>
                <to uri="vm:ReceiveOrder"/>
            </split>
        </route>
        <route>
            <from uri="vm:ReceiveOrder"/>
            <to uri="bean:OrderVerifierBean" />
            <aggregate strategyRef="aggregatorStrategy" eagerCheckCompletion="true" completionTimeout="3000">
                <correlationExpression>
                    <header>gid</header>
                </correlationExpression>
                <completionSize>
                    <header>CamelSplitSize</header>
                </completionSize>
                <to uri="bean:Aggregated"/>
            </aggregate>
        </route>
    </camelContext>
</blueprint>

 

这里切分的时候是使用的一个bean对象来切分的,也可以使用正则表达式或者字符串。

就是把"<method bean="OrderSplitBean" method="splitOrders"/>"这个改成"<tokenize token="@"/>",切分的时候就会以"@"来切分。

加入<setHeader headerName="gid">
                <simple>${date:now:HH_mm_ss}</simple>
        </setHeader>
这行代码是为了方便聚合的时候使用,因为camel聚合的时候是根据头部相同的来聚合的。

<correlationExpression>
                    <header>gid</header>
</correlationExpression>
这个是表示聚合条件,以头部相同的来聚合。

<completionSize>
                    <header>CamelSplitSize</header>
</completionSize>
这个是聚合完成条件。以切分的大小为完成条件。CamelSplitSize这个参数在camel的2.9以后已经加入到了camel的头部header 里面。详细请参考链接http://camel.apache.org/splitter.html

completionTimeout="3000"这个参数是超时时间,当3秒钟还没聚合完时,就算超时。
strategyRef="aggregatorStrategy" 这个是聚合的策略,要自己编写一个类继承AggregationStrategy的。

里面就写要怎样聚合,我这里是简单把2个值相加。

<to uri="bean:Aggregated"/>这个是最后聚合完成了,要跳转的bean。可以自己随便写。

OrderSplit.java

package com.TicketCenter.Application.OrderProcedure;

import java.util.List;

/**
 * 切分订单
 * @author lym
 */
public class OrderSplit {
    public List<String> splitOrders(List<String> orderStrList) {
        return orderStrList;
    }
}

StringAggregationStrategy.java

package com.TicketCenter.Application.OrderProcedure;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

/**
 *
 * @author Administrator
 */
public class StringAggregationStrategy implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }
        oldExchange.getIn().setBody(newExchange.getIn().getBody(Order.class));
        return oldExchange;
    }
}