欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

消息队列之简单队列

程序员文章站 2024-03-22 09:23:22
...

简单队列

1、简单队列的使用

pom添加依赖

    <!--添加rabbitmq依赖-->
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>
//连接工具类
public class ConnectionUtils {
    /**
     * 获取MQ连接
     */

    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接共存
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("8.129.166.53");
        //AMQP 5672
        factory.setPort(5672);
        //vhost
        factory.setVirtualHost("/my_virtual");
        //用户名
        factory.setUsername("wjm");
        //密码
        factory.setPassword("qwe123");
        return factory.newConnection();
    }
}
//测试类
package com.study;

import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-bean.xml"})
public class TestRabbitMQ {
    private static Logger logger= LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
    private static final String QUEUE_NAME="test_simple_queue";

    @Test
    public void testSend() throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtils.getConnection();
        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        String msg = "第一次测试数据!";
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("--send msg:" + msg);

        channel.close();
        connection.close();
    }

    @Test
    public void testGet() throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DefaultConsumer  consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "utf-8");
                logger.info(msgString);
                System.out.println("消费者获取消息:" + msgString);
            }
        };
        /** 3.监听队列 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

登录网页客户端可在Queue中查看