欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

activemq点对点消息传送 博客分类: activemq activemq 

程序员文章站 2024-03-17 12:29:16
...
activemq点对点消息传送

消息中间件顾名思义实现的就是在两个系统或两个客户端之间进行消息传送

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现

ActiveMQ常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验。

bin/activemq start

AcitveMQ的数据传送流程:
proceder->broker->consumer

ActiveMQ的两种消息传递类型:
(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,消费者获取该条队列里的数据。
(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producter {

    //ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的链接地址
    //private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String BROKEN_URL = "failover://tcp://192.168.1.128:61616";
   
    AtomicInteger count = new AtomicInteger(0);
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个链接
            connection  = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //创建一个事务(这里通过参数可以设置事务的级别)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
           while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                System.out.println(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Consumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    //private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String BROKEN_URL = "failover://tcp://192.168.1.128:61616";
   
    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                System.out.println("..");
                TextMessage msg = (TextMessage) consumer.receive();
                System.out.println("....");
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class ProducterTest {
    public static void main(String[] args){
        Producter producter = new Producter();
        producter.init();
        ProducterTest testMq = new ProducterTest();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producter)).start();
    }

    private class ProductorMq implements Runnable{
        Producter producter;
        public ProductorMq(Producter producter){
            this.producter = producter;
        }

        @Override
        public void run() {
            while(true){
                try {
                    producter.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

//第一种消费消息的方式
public class ConsumerTest {
    public static void main(String[] args){
        Consumer comsumer = new Consumer();
        comsumer.init();
        ConsumerTest testConsumer = new ConsumerTest();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
    }

    private class ConsumerMq implements Runnable{
        Consumer comsumer;
        public ConsumerMq(Consumer comsumer){
            this.comsumer = comsumer;
        }

        @Override
        public void run() {
            while(true){
                try {
                    comsumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

//第二种消费消息的方式
//点对点-使用监听器接收消息
public class ConsumerListener {
 
    // tcp 地址 服务器器端地址
    //public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.1.128:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp中可以查询到发送的mq消息
    public static final String DESTINATION = "Jaycekon-MQ";
    //测试连接使用默认的用户名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
    //测试连接使用默认的密码
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
    
    public static void run() throws Exception {
        
        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 1、创建链接工厂
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ConsumerListener.DEFAULT_USER, ConsumerListener.DEFAULT_PASSWORD, ConsumerListener.BROKER_URL);
            // 2、通过工厂创建一个连接
            connection = factory.createQueueConnection();
            // 3、启动连接
            connection.start();
            // 4、创建一个session会话
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、创建一个消息队列
            Queue queue = session.createQueue(DESTINATION);
            // 创建消息接收者
            javax.jms.QueueReceiver receiver = session.createReceiver(queue);
            
            //使用内部类为消息接收者加载相应的Listener监听
            receiver.setMessageListener(new MessageListener() {
                //重写onMessage方法
                public void onMessage(Message msg) {
                    if (msg != null) {
                        TextMessage textMessage = (TextMessage) msg;
                        try {
                            System.out.println("接收#" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 休眠10s再关闭 接收生产者发送的全部的10条消息
            // 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
            // 也就是QueueReceiver_Listener这个类进入休眠状态了,而接收者的监听器仍然会继续执行的哦。
            Thread.sleep(1000 * 10);
            
            // 提交会话
            session.commit();
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        ConsumerListener.run();
    }
}

相关标签: activemq