欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Mule ESB 学习笔记(18)Mule和ATOM的整合

程序员文章站 2022-05-13 13:25:55
...

 

定时读取特定文件的rss文件:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:atom="http://www.mulesoft.org/schema/mule/atom"
      xmlns:http="http://www.mulesoft.org/schema/mule/http"
      xmlns:file="http://www.mulesoft.org/schema/mule/file"
      xsi:schemaLocation="
       http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd      
        http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
        http://www.mulesoft.org/schema/mule/atom http://www.mulesoft.org/schema/mule/atom/current/mule-atom.xsd
        http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

    <flow name="feedconcumes">
       <file:inbound-endpoint  path="/e:/upload/data" pollingFrequency="1000" />
       <file:filename-wildcard-filter pattern="*.atom"/>
       <atom:feed-splitter/>
    </flow>
</mule>

 

 

定时读取特定网址的新rss信息:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:atom="http://www.mulesoft.org/schema/mule/atom"
      xmlns:http="http://www.mulesoft.org/schema/mule/http"
      xmlns:test="http://www.mulesoft.org/schema/mule/test"
      xsi:schemaLocation="
        http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
        http://www.mulesoft.org/schema/mule/atom http://www.mulesoft.org/schema/mule/atom/current/mule-atom.xsd
        http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
        http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd">

    <flow name="feedConsumer">
        <poll frequency="1000">
            <http:outbound-endpoint address="http://topmanopensource.iteye.com/rss" method="GET"/>
        </poll>
        <atom:feed-splitter/>
    </flow>
</mule>

 

 

jms将消息发送特定的组件

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:atom="http://www.mulesoft.org/schema/mule/atom"
      xmlns:jms="http://www.mulesoft.org/schema/mule/jms"
       xmlns:spring="http://www.springframework.org/schema/beans"
      xmlns:test="http://www.mulesoft.org/schema/mule/test"
      xsi:schemaLocation="
             http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
               http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
               http://www.mulesoft.org/schema/mule/atom http://www.mulesoft.org/schema/mule/atom/current/mule-atom.xsd
               http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
               http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd">

   <jms:activemq-connector name="jmsConnectorNoRedelivery" maxRedelivery="-1" />
   
   <spring:beans>
     <spring:bean  id="feedConsumer" class="com.easyway.esb.mule.rss.FeedReceiver" />
       <spring:bean  id="entryReceiver" class="com.easyway.esb.mule.rss.EntryReceiver" />
   </spring:beans>
    <flow name="feedConsumerFlow">
        <jms:inbound-endpoint queue="feed.in" connector-ref="jmsConnectorNoRedelivery"/>

        <component>
            <spring-object  bean="feedConsumer"/>
        </component>
    </flow>

    <flow name="feedSplitterConsumerFlow">
        <jms:inbound-endpoint queue="feed.split.in" connector-ref="jmsConnectorNoRedelivery">
            <atom:feed-splitter/>
        </jms:inbound-endpoint>
        <component>
        <spring-object  bean="entryReceiver"/>
        </component>
    </flow>

</mule>

 

 

 

 

 

package com.easyway.esb.mule.rss;

import org.mule.api.annotations.expressions.Expr;
import org.mule.api.annotations.param.Payload;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.abdera.model.Entry;
import org.apache.abdera.model.Feed;

public class EntryReceiver
{

    private AtomicInteger receivedEntries = new AtomicInteger(0);

    public void processEntry(@Payload Entry entry, @Expr("#[header:invocation:feed.object]") Feed feed) throws Exception
    {
        System.out.println("Received " + receivedEntries.incrementAndGet() + " of " + feed.getEntries().size() + " entries");
    }

    public int getCount()
    {
        return receivedEntries.get();
    }

    public AtomicInteger getReceivedEntries()
    {
        return receivedEntries;
    }
}

 

 

package com.easyway.esb.mule.rss;

import org.mule.api.annotations.param.Payload;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.abdera.model.Feed;

public class FeedReceiver
{

    private final AtomicInteger receivedEntries = new AtomicInteger(0);

    public void processFeed(@Payload Feed feed) throws Exception
    {
        receivedEntries.set(0);
        System.out.println("Received " + feed.getEntries().size() + " events");
        receivedEntries.set(feed.getEntries().size());
    }

    public int getCount()
    {
        return receivedEntries.get();
    }

    public AtomicInteger getReceivedEntries()
    {
        return receivedEntries;
    }
}

 

 

测试类:

import java.io.File;
import java.io.InputStream;
import java.io.StringReader;

import org.apache.commons.io.FileUtils;
import org.mule.api.MuleContext;
import org.mule.api.client.MuleClient;
import org.mule.api.context.MuleContextFactory;
import org.mule.config.spring.SpringXmlConfigurationBuilder;
import org.mule.context.DefaultMuleContextFactory;

public class MuleJmsAtomMain {
	public static void main(String[] args) {
     try {
	       String configFile = "jms-atom-consume.xml";
	        System.setProperty("mule.verbose.exceptions","true");
	        String[] configFileArr = new String[] {configFile };
	        MuleContextFactory muleContextFactory = new DefaultMuleContextFactory();
	        MuleContext muleContext = muleContextFactory
	                .createMuleContext(new SpringXmlConfigurationBuilder(configFileArr));
	        muleContext.start();
	        
		    MuleClient client = muleContext.getClient();
	        FeedReceiver component = (FeedReceiver)muleContext.getRegistry().get("feedConsumer");
	        
	        String path=MuleJmsAtomMain.class.getClassLoader().getResource("./sample-feed.atom").getFile();
	       
	        String feed=FileUtils.readFileToString(new File(path));
	        client.dispatch("jms://feed.in", feed, null);
	        Thread.sleep(2000);
	        System.out.println(component.getCount());
	        
	         client = muleContext.getClient();
	         component = (FeedReceiver)muleContext.getRegistry().get("feedConsumer");
	        component.getReceivedEntries().set(0); //reset since the build reports that it's getting incremented someplace else
	        client.dispatch("jms://feed.split.in", feed, null);
	        Thread.sleep(5000);                
	        System.out.println(component.getCount());
	} catch (Exception e) {
		// TODO: handle exception
		e.printStackTrace();
	}
	}

}