Mule ESB 学习笔记(18)Mule和ATOM的整合
程序员文章站
2022-05-13 13:25:55
...
定时读取特定文件的rss文件:
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:atom="http://www.mulesoft.org/schema/mule/atom" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:file="http://www.mulesoft.org/schema/mule/file" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/atom http://www.mulesoft.org/schema/mule/atom/current/mule-atom.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd"> <flow name="feedconcumes"> <file:inbound-endpoint path="/e:/upload/data" pollingFrequency="1000" /> <file:filename-wildcard-filter pattern="*.atom"/> <atom:feed-splitter/> </flow> </mule>
定时读取特定网址的新rss信息:
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:atom="http://www.mulesoft.org/schema/mule/atom" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:test="http://www.mulesoft.org/schema/mule/test" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/atom http://www.mulesoft.org/schema/mule/atom/current/mule-atom.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd"> <flow name="feedConsumer"> <poll frequency="1000"> <http:outbound-endpoint address="http://topmanopensource.iteye.com/rss" method="GET"/> </poll> <atom:feed-splitter/> </flow> </mule>
jms将消息发送特定的组件
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:atom="http://www.mulesoft.org/schema/mule/atom" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:test="http://www.mulesoft.org/schema/mule/test" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/atom http://www.mulesoft.org/schema/mule/atom/current/mule-atom.xsd http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd"> <jms:activemq-connector name="jmsConnectorNoRedelivery" maxRedelivery="-1" /> <spring:beans> <spring:bean id="feedConsumer" class="com.easyway.esb.mule.rss.FeedReceiver" /> <spring:bean id="entryReceiver" class="com.easyway.esb.mule.rss.EntryReceiver" /> </spring:beans> <flow name="feedConsumerFlow"> <jms:inbound-endpoint queue="feed.in" connector-ref="jmsConnectorNoRedelivery"/> <component> <spring-object bean="feedConsumer"/> </component> </flow> <flow name="feedSplitterConsumerFlow"> <jms:inbound-endpoint queue="feed.split.in" connector-ref="jmsConnectorNoRedelivery"> <atom:feed-splitter/> </jms:inbound-endpoint> <component> <spring-object bean="entryReceiver"/> </component> </flow> </mule>
package com.easyway.esb.mule.rss; import org.mule.api.annotations.expressions.Expr; import org.mule.api.annotations.param.Payload; import java.util.concurrent.atomic.AtomicInteger; import org.apache.abdera.model.Entry; import org.apache.abdera.model.Feed; public class EntryReceiver { private AtomicInteger receivedEntries = new AtomicInteger(0); public void processEntry(@Payload Entry entry, @Expr("#[header:invocation:feed.object]") Feed feed) throws Exception { System.out.println("Received " + receivedEntries.incrementAndGet() + " of " + feed.getEntries().size() + " entries"); } public int getCount() { return receivedEntries.get(); } public AtomicInteger getReceivedEntries() { return receivedEntries; } }
package com.easyway.esb.mule.rss; import org.mule.api.annotations.param.Payload; import java.util.concurrent.atomic.AtomicInteger; import org.apache.abdera.model.Feed; public class FeedReceiver { private final AtomicInteger receivedEntries = new AtomicInteger(0); public void processFeed(@Payload Feed feed) throws Exception { receivedEntries.set(0); System.out.println("Received " + feed.getEntries().size() + " events"); receivedEntries.set(feed.getEntries().size()); } public int getCount() { return receivedEntries.get(); } public AtomicInteger getReceivedEntries() { return receivedEntries; } }
测试类:
import java.io.File; import java.io.InputStream; import java.io.StringReader; import org.apache.commons.io.FileUtils; import org.mule.api.MuleContext; import org.mule.api.client.MuleClient; import org.mule.api.context.MuleContextFactory; import org.mule.config.spring.SpringXmlConfigurationBuilder; import org.mule.context.DefaultMuleContextFactory; public class MuleJmsAtomMain { public static void main(String[] args) { try { String configFile = "jms-atom-consume.xml"; System.setProperty("mule.verbose.exceptions","true"); String[] configFileArr = new String[] {configFile }; MuleContextFactory muleContextFactory = new DefaultMuleContextFactory(); MuleContext muleContext = muleContextFactory .createMuleContext(new SpringXmlConfigurationBuilder(configFileArr)); muleContext.start(); MuleClient client = muleContext.getClient(); FeedReceiver component = (FeedReceiver)muleContext.getRegistry().get("feedConsumer"); String path=MuleJmsAtomMain.class.getClassLoader().getResource("./sample-feed.atom").getFile(); String feed=FileUtils.readFileToString(new File(path)); client.dispatch("jms://feed.in", feed, null); Thread.sleep(2000); System.out.println(component.getCount()); client = muleContext.getClient(); component = (FeedReceiver)muleContext.getRegistry().get("feedConsumer"); component.getReceivedEntries().set(0); //reset since the build reports that it's getting incremented someplace else client.dispatch("jms://feed.split.in", feed, null); Thread.sleep(5000); System.out.println(component.getCount()); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } }
上一篇: RSS格式详解
下一篇: php调用COM组件,php调用组件