欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ActiveMQ消息中间件

程序员文章站 2022-07-01 15:31:54
...

对于这次ActiveMQ的学习,故做一次学习记录,面试的时候能翻翻看看。
先导入pom.xml

 <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
</dependency>

生产者代码

 		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("message_queue");
        javax.jms.MessageProducer producer = session.createProducer(queue);


        for(int i=0;i<10;i++){
            producer.send(session.createTextMessage("消息队列【"+i+"】"));
        }
        session.commit();
        connection.close();

消费者代码

		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();
		// 第一个参数代表是否开启事务,第二个参数代表传递参数
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue message_queue = session.createQueue("message_queue");
        MessageConsumer consumer = session.createConsumer(message_queue);
        consumer.setMessageListener(new MessageListener() {

            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    System.err.println("通道1:"+textMessage.getText());
                } catch (Exception e) {
                    e.printStackTrace();
                }


            }
        });

那些情况会引起消息重发?


1.消费端开启事务,调用了 session.rollback();
2:消费端开启事务,没有调用session.commit();
3:消费的使用Session.CLIENT_ACKNOWLEDGE传递模式,调用session.recover();


消息重发时间间隔和重发次数


默认时间间隔:1000ms
默认重发次数:6
消费者可以自定义重发策略,次数达到后,将进入ActiveMQ.DLQ(死信队列)

		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        // 设置重试次数,这里为2,实际为3次重试,重试还没成功接收到消息,直接进入死信队列
        redeliveryPolicy.setMaximumRedeliveries(2);
        // 设置初始重试等待时间
        redeliveryPolicy.setInitialRedeliveryDelay(2000);
        // 设置重发策略
        factory.setRedeliveryPolicy(redeliveryPolicy);