欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring boot集成spring integration 博客分类: spring bootspring integration spring integration 

程序员文章站 2024-03-12 14:10:14
...

主要使用integration的聚合器

1.pom.xml关键jar引用

 

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-http</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>fastjson</artifactId>
	<version>1.2.51</version>
</dependency>
<dependency>
	<groupId>commons-io</groupId>
	<artifactId>commons-io</artifactId>
	<version>2.6</version>
</dependency>
<dependency>
	<groupId>org.apache.httpcomponents</groupId>
	<artifactId>httpmime</artifactId>
	<version>4.5.6</version>
</dependency>
<dependency>
	<groupId>ch.qos.logback</groupId>
	<artifactId>logback-core</artifactId>
	<scope>provided</scope>
</dependency>

 2.http-inbound-config.xml配置

 

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-http="http://www.springframework.org/schema/integration/http"
       xmlns:context="http://www.springframework.org/schema/context" 
xmlns="http://www.springframework.org/schema/beans"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/integration 
http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/task 
http://www.springframework.org/schema/task/spring-task.xsd
		http://www.springframework.org/schema/integration/http 
http://www.springframework.org/schema/integration/http/spring-integration-http.xsd 
http://www.springframework.org/schema/context 
http://www.springframework.org/schema/context/spring-context.xsd">

	<int:channel id="echoChannel">
		<int:queue capacity="1000"/>
	</int:channel>
	<int-http:inbound-gateway request-channel="echoChannel" path="/cc/test" 
               supported-methods="POST" request-payload-type="java.lang.String">
        <int-http:cross-origin/>
    </int-http:inbound-gateway>
    <int:chain input-channel="echoChannel" output-channel="search-request-a">
		<int:poller fixed-rate="1" max-messages-per-poll="10" time-unit="SECONDS">
		</int:poller>
		<int:header-enricher>
        	<int:header name="ssmail" value="test"/>
        </int:header-enricher>
		<int-http:outbound-gateway charset="UTF-8" http-method="POST" 
                 expected-response-type="com.test.integration.splitteraggregator.CriteriaA"
		 encode-uri="true" url="{testApi}" uri-variables-expression="@testApi" 
                 reply-timeout="30000" />
	</int:chain>
	
    <bean id="serviceActivatorA" 
      class="com.test.integration.splitteraggregator.ServiceActivatorA"></bean>
	<int:service-activator input-channel="search-request-a"
		method="service" output-channel="search-reply" ref="serviceActivatorA">
	</int:service-activator>
	
	<int:channel id="echoChannel2">
		<int:queue capacity="1000"/>
	</int:channel>
	<int:logging-channel-adapter id="logger" level="INFO" log-full-message="true"/>
	<int-http:inbound-gateway request-channel="echoChannel2" path="/cc/test2" 
            supported-methods="POST" request-payload-type="java.lang.String">
        <int-http:cross-origin/>
    </int-http:inbound-gateway>
    <int:chain input-channel="echoChannel2" output-channel="search-request-b">
		<int:poller fixed-rate="1" max-messages-per-poll="10" time-unit="SECONDS">
		</int:poller>
		<int:header-enricher>
        	<int:header name="ssmail" value="test"/>
        </int:header-enricher>
		<int-http:outbound-gateway charset="UTF-8" http-method="POST" 
                    expected-response-type="byte[]"
		    encode-uri="true" url="{test2Api}" uri-variables-expression="@test2Api" 
                    reply-timeout="30000" />
	</int:chain>
	
	<!-- <int:channel id="search-request-b"/> -->
    <bean id="serviceActivatorB" 
      class="com.test.integration.splitteraggregator.ServiceActivatorB"></bean>
	<int:service-activator input-channel="search-request-b"
		method="service" output-channel="search-reply" ref="serviceActivatorB">
	</int:service-activator>
	
	<int:channel id="aggregated-reply"/>
	<int:channel id="search-reply" >
	<int:interceptors>
			<int:wire-tap channel="logger" />
		</int:interceptors>
	</int:channel>
	<int:aggregator input-channel="search-reply" method="aggregate" 
		ref="resultAggregator" output-channel="aggregated-reply"
		message-store="searchResultMessageStore"
		expire-groups-upon-completion="true"
		correlation-strategy-expression="headers['ssmail']"
        expire-groups-upon-timeout="true"
        release-strategy-expression="size() == 2" 
		send-partial-result-on-expiry="true">
	</int:aggregator>
	<int:logging-channel-adapter channel="aggregated-reply" level="INFO" 
           log-full-message="true"></int:logging-channel-adapter>

	<!-- Define a store for our search results and set up a reaper that will
		periodically expire those results. -->
	<bean id="searchResultMessageStore" 
           class="org.springframework.integration.store.SimpleMessageStore" />

	<bean id="searchResultMessageStoreReaper" 
           class="org.springframework.integration.store.MessageGroupStoreReaper">
		<property name="messageGroupStore" ref="searchResultMessageStore" />
		<property name="timeout" value="20000" />
	</bean>
	<bean id="resultAggregator" 
           class="com.test.integration.splitteraggregator.ResultAggregator" />
	
</beans>

 说明:path="/cc/test"是本地监听路径,标蓝url="{testApi}"即为实际访问路径,并返回结果到search-request-a这个通道中,再由serviceActivatorA处理结果集,最后交给search-reply聚合数据。

 

3.相关java代码

//响应信息
public class Result {
	private String title;
	private String content;
}

public class CriteriaA {
	private Boolean success;//根据系统响应属性不同而定义
	private String code;
        private String error;
	private Result result;
}

public class CriteriaB {
	private Integer code;//根据系统响应属性不同而定义
        private String errmsg;
	private Result result;
}

 

//响应信息处理,处理方式根据系统响应结果而定
public class ServiceActivatorA {
	public Result service(CriteriaA criteria) {
		Result result = new Result();
		if(criteria.getSuccess() && criteria.getResult!=null){
			result = criteria.getResult();
		}
		return result;
	}
}

public class ServiceActivatorB {
	public Result service(byte[] data) {
		Result result = new Result();
		try {
			String dataStr = new String(data, "UTF-8");
			CriteriaB criterib = JSON.parseObject(dataStr, CriteriaB.class);
			if(criterib.getCode()==10000 && criterib.getResult()!=null) {
				result = criterib.getResult();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return result;
	}
}

 

//聚合结果集
public class CompositeResult extends Result {
	private Collection<Result> results = new ArrayList<Result>();
	public Collection<Result> getResults() {
		return results;
	}
}

public class ResultAggregator {
	public Result aggregate(Collection<Result> results) {
		CompositeResult result = new CompositeResult();
		result.getResults().addAll(results);
		System.out.println("result size:"+result.getResults().size());
		//具体业务处理。。。
		return result;
	}
}

 另外说明uri-variables-expression动态配置:获取bean对象返回值(该值必须为map),然后通过{xxx}这个方式获取,关键代码:

@Configuration
public class UrlConfig {
	@Value("${url.testApi}")
	private String testApi;
	@Value("${url.test2Api}")
	private String test2Api;

	@Bean(name="testApi")
	public Map<String, String> getTestApi() {
		Map<String, String> map = new HashMap<String, String>();
		map.put("testApi", testApi);
		return map;
	}

	@Bean(name="test2Api")
	public Map<String, String> getTest2Api() {
		Map<String, String> map = new HashMap<String, String>();
		map.put("test2Api", test2Api);
		return map;
	}
}

4.项目启动

@SpringBootApplication
@ImportResource(locations = "classpath:http-inbound-config.xml")
@EnableIntegration
public class HttpApplication {
    public static void main(String[] args) {
        SpringApplication.run(HttpApplication.class, args);
    }
}

  

5.访问,注意要保证跳转的路径(testApi,test2Api)能访问

可通过Restlet Client访问,post请求输入localhost:8080/cc/test,localhost:8080/cc/test2,若有参数就输入参数

spring boot集成spring integration
            
    
    博客分类: spring bootspring integration spring integration 

  • spring boot集成spring integration
            
    
    博客分类: spring bootspring integration spring integration 
  • 大小: 22 KB
相关标签: spring integration