消息队列之简单队列
程序员文章站
2024-03-22 09:23:22
...
简单队列
1、简单队列的使用
pom添加依赖
<!--添加rabbitmq依赖-->
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
//连接工具类
public class ConnectionUtils {
/**
* 获取MQ连接
*/
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接共存
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("8.129.166.53");
//AMQP 5672
factory.setPort(5672);
//vhost
factory.setVirtualHost("/my_virtual");
//用户名
factory.setUsername("wjm");
//密码
factory.setPassword("qwe123");
return factory.newConnection();
}
}
//测试类
package com.study;
import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-bean.xml"})
public class TestRabbitMQ {
private static Logger logger= LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
private static final String QUEUE_NAME="test_simple_queue";
@Test
public void testSend() throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg = "第一次测试数据!";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("--send msg:" + msg);
channel.close();
connection.close();
}
@Test
public void testGet() throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "utf-8");
logger.info(msgString);
System.out.println("消费者获取消息:" + msgString);
}
};
/** 3.监听队列 */
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
登录网页客户端可在Queue中查看
上一篇: FreeRTOS 消息队列总结
下一篇: hash(散列)——思想、编码应用