欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

原创:jfinal2.0整合activemq

程序员文章站 2022-06-13 17:50:58
...

     最近项目中需要用到jfinal整合activemq,请教了下度娘,jfinal没有出这个插件,所以自己小小的封装了一下,跑起来居然可以用,在这里把源码分享给大家!不喜勿喷,希望给我提一些意见!

     github地址: https://github.com/javaxiaoyetan/istudy.git

   

1.项目目录截图

             原创:jfinal2.0整合activemq
            
    
    博客分类: jfinal jfinalactivemq 

         

 2.pom文件添加依赖

   

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xiaoyetanmodules</groupId>
    <artifactId>jfinalAvtivemq</artifactId>
    <packaging>war</packaging>
    <version>1.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <jdk.version>1.8</jdk.version>
    </properties>

    <dependencies>
        <!-- jfinal -->
        <dependency>
            <groupId>jfinal</groupId>
            <artifactId>jfinal</artifactId>
            <version>2.0</version>
            <scope>system</scope>
            <systemPath>${basedir}/src/main/webapp/WEB-INF/lib/jfinal-2.0-bin-with-src.jar</systemPath>
        </dependency>
        <dependency>
            <groupId>com.jfinal</groupId>
            <artifactId>cos</artifactId>
            <version>26Dec2008</version>
        </dependency>

        <!-- logback -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.10</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.2</version>
        </dependency>
        <!-- mq -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.13.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.1</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.15</version>
        </dependency>
    </dependencies>


</project>

 

 3.ActivemqPlugin 

package com.xiaoyetan.common.mq.plugins;

import java.util.HashMap;
import java.util.Map;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.thread.TaskRunnerFactory;
import com.jfinal.plugin.IPlugin;
import org.apache.log4j.Logger;

import javax.jms.*;

/**
 * @Author xiaoyetan
 * @Date :created on 17:26 2017/8/23
 */
public class ActivemqPlugin implements IPlugin{
    private final static Logger log = Logger.getLogger(ActivemqPlugin.class);
    private String brokerUrl;

    private ActiveMQConnectionFactory targetFactory;

    private PooledConnectionFactory pooledConFactory;

    private Connection connection;

    private Session session;

    private Map<String, Queue> queueMap = new HashMap<String, Queue>();
    private boolean isStarted = false;


    private Map<String, MessageProducer> queueProducer = new HashMap<String, MessageProducer>();

    public ActivemqPlugin(String brokerUrl) {
        this.brokerUrl = brokerUrl;
        try {
            init();
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }


    public void init() throws JMSException {
        if (null == pooledConFactory) {
            targetFactory = new ActiveMQConnectionFactory(brokerUrl);
            targetFactory.setUseAsyncSend(true);
            TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
            taskRunnerFactory.setMaxIterationsPerRun(2);
            taskRunnerFactory.setMaxThreadPoolSize(10);
            taskRunnerFactory.setShutdownAwaitTermination(10);
            taskRunnerFactory.setDaemon(false);
            targetFactory.setSessionTaskRunner(taskRunnerFactory);
            //pooledConFactory = new PooledConnectionFactory(brokerUrl);  //都可以
            pooledConFactory = new PooledConnectionFactory((ActiveMQConnectionFactory) targetFactory);
            pooledConFactory.setMaxConnections(200);
        }
    }

    public Session createCon() throws JMSException {
        if (null == connection) {
            connection = pooledConFactory.createConnection();
            connection.start();
        }
        if (null == session) {
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        }
        isStarted =true;
        return session;
    }

    public void addQueue(String queueName) {
        if (null == queueName || queueName.trim().equals("")) {
            throw new RuntimeException("Queue name parameter is blank!");
        }
        synchronized (queueMap) {
            if (null == queueMap) {
                queueMap = new HashMap<String, Queue>();
            }
            if (!queueMap.containsKey(queueName)) {
                queueMap.put(queueName, new ActiveMQQueue(queueName));
            }
        }
    }

    public void addQueneMessageListener(String queneName, MessageListener msgListenre) throws JMSException {
        addQueue(queneName);
        MessageConsumer comsumer = createCon().createConsumer(queueMap.get(queneName));
        comsumer.setMessageListener(msgListenre);
    }


    public void sendQueueMsg(String queueName, String msg) throws JMSException {
        addQueue(queueName);
        //创建一个生产者,然后发送多个消息。
        if (null == queueProducer) {
            queueProducer = new HashMap<String, MessageProducer>();
        }
        if (!queueProducer.containsKey(queueName)) {
            queueProducer.put(queueName, createCon().createProducer(queueMap.get(queueName)));
        }
        MessageProducer producer = queueProducer.get(queueName);
        //producer.setTimeToLive(time);
        producer.send(createCon().createTextMessage(msg));
    }

    public void setBrokerUrl(String brokerUrl) {
        this.brokerUrl = brokerUrl;
    }

    public String getBrokerUrl() {
        return brokerUrl;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }

    public boolean stop() {
        return false;
    }

    public boolean start() {
        if (null != session) {
            log.info("activemq isStarted>>>");
            return true;
        }else{
            new ActivemqService();
        }
        return true;
    }

}

 

   4.ActivemqService 

package com.xiaoyetan.common.mq.plugins;

import com.xiaoyetan.Const;
import com.xiaoyetan.common.mq.CallCrmUnsubListener;
import org.apache.log4j.Logger;

import javax.jms.JMSException;


/**
 * @Author xiaoyetan
 * @Date :created on 17:28 2017/8/23
 */
public class ActivemqService {

    private final static Logger log = Logger.getLogger(ActivemqService.class);

    private static  String brokerUrl = Const.getValue("mq.brokerURL");
    public static final String CrmUnsubQueueName=Const.getValue("crm.unsub.queue");

    private static ActivemqPlugin mq ;

    static{
        try {
            log.info("ActivemqService static >>>");
            mq= new ActivemqPlugin(brokerUrl);
            mq.addQueneMessageListener(CrmUnsubQueueName, new CallCrmUnsubListener());
            log.info("ActivemqService static end>>>");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取MQ工具类
     * @return
     */
    public static ActivemqPlugin MQ(){
        return mq;
    }

    // 发送消息
    public static void sendMessage(String queueName, final String msgJSON) {
        try {
            mq.sendQueueMsg(queueName,msgJSON);
            log.info(msgJSON);
        }catch (Exception e){
            e.printStackTrace();
        }

    }

}

5.jinalCoreConfig

package com.xiaoyetan.jfinal;

import com.jfinal.config.*;
import com.jfinal.kit.PropKit;
import com.xiaoyetan.Const;
import com.xiaoyetan.common.mq.plugins.ActivemqPlugin;

/**
 * @Author xiaoyetan
 * @Date :created on 17:22 2017/8/23
 */
public class JfinalCoreConfig extends JFinalConfig {
    //配置常量
    public void configConstant(Constants me) {
        PropKit.use("config.properties");
    }

    //配置路由
    public void configRoute(Routes me) {
    }

    //配置插件
    public void configPlugin(Plugins me) {
        ActivemqPlugin mqPlugin = new ActivemqPlugin(Const.getValue("mq.brokerURL"));
        me.add(mqPlugin);
    }

    //配置拦截器
    public void configInterceptor(Interceptors me) {

    }

    //配置处理器
    public void configHandler(Handlers me) {
    }

}

 

         

  • 原创:jfinal2.0整合activemq
            
    
    博客分类: jfinal jfinalactivemq 
  • 大小: 37.8 KB
相关标签: jfinal activemq