【RabbitMQ消息中间件】4.简单队列
程序员文章站
2022-07-14 22:46:18
...
安装和启动好了RabbitMQ后,我们下面来通过编写一个Java程序来学习第一个队列知识。
第一篇我们提到过,RabbitMQ一共有六种队列模式:
分别是“HelloWorld”简单队列模式、“Work queues”工作队列模式、“Publish/Subscribe”发布/订阅模式、“Routing”路由模式、“Topics”通配符模式、“RPC”远程服务调用。
那么我们首先从最基本的“HelloWorld”简单队列讲解。关于“HelloWorld”简单队列的队列模式图如下所示:
其中“P”代表消息的生产者(Producter),由它来产生消息。而中间红色的框就是队列,即是生产者产生消息之后,将消息发送到队列。后面的“C”代表消息的消费者(consumer),即是将生产者发送至消息队列中的消息进行消费(取出)。
这就是最简单的一个队列模式。
我们下面使用Java连接RabbitMQ,并实现简单队列。
首先创建一个maven工程:
上面我们的账号信息使用了一开始创建的jack账号的信息。
然后创建消息的生产者和消费者的模拟类。新建名为“Send”的类,作为生产者:
然后使用channel对象的“queueDeclare”方法声明(创建)了一个“队列”,然后在参数中指定队列的名称(这里指定为“test_queue”)。
然后创建一个消息,通过channel对象的“basicPublish”方法将消息发送至“test_queue”队列。
最后关闭通道和连接。
运行该生产者:
然后我们可以在RabbitMQ的管理工具的Queues模块中查看队列信息:
点击队列名称,然后选择“Get messages”选择,点击按钮,可以查看队列中信息的详细内容:
图4.10.png
然后创建一个名为“Recv”的类,作为消息消费者:
然后定义队列的消费者QueueingConsumer,然后使用“basicConsume”来监听队列。后面我们写了一个死循环,用于一致监听队列并获取消息。在循环块中使用consumer对象的nextDelivery()方法来进行消息的接收和消费。
这里要注意的是,在该模式下,消费者接收的消息一旦被消费,则队列中就不再有此消息(相当于“阅后即焚”)。在运行消费者之前,观察队列的情况:
运行消费者之后,在控制台可以看到消费者接收到的信息:
并且在管理工具中可以看到队列中的消息已经被消费而不存在:
第一篇我们提到过,RabbitMQ一共有六种队列模式:
分别是“HelloWorld”简单队列模式、“Work queues”工作队列模式、“Publish/Subscribe”发布/订阅模式、“Routing”路由模式、“Topics”通配符模式、“RPC”远程服务调用。
那么我们首先从最基本的“HelloWorld”简单队列讲解。关于“HelloWorld”简单队列的队列模式图如下所示:
其中“P”代表消息的生产者(Producter),由它来产生消息。而中间红色的框就是队列,即是生产者产生消息之后,将消息发送到队列。后面的“C”代表消息的消费者(consumer),即是将生产者发送至消息队列中的消息进行消费(取出)。
这就是最简单的一个队列模式。
我们下面使用Java连接RabbitMQ,并实现简单队列。
首先创建一个maven工程:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jack.rabbitmq</groupId>
<artifactId>RabbitMQ_Test_project</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build/>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
</dependencies>
</project>
然后在src/mian/java中创建一个连接工厂ConnectionUtil,用于连接RabbitMQ:
package cn.jack.rabbitmq.connection;
import java.io.IOException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws IOException{
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//定义连接地址
factory.setHost("localHost");
//定义端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/jack");
factory.setUsername("jack");
factory.setPassword("jack");
// 通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}
在该类中首先定义了一个连接工厂“ConnectionFactory”,然后设置其服务地址、端口号、账号信息(用户名、密码、vhost),最后通过连接工厂获取一个RabbitMQ的连接对象。上面我们的账号信息使用了一开始创建的jack账号的信息。
然后创建消息的生产者和消费者的模拟类。新建名为“Send”的类,作为生产者:
package cn.jack.rabbitmq.simple;
import java.io.IOException;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME="test_queue";
public static void main(String[] args) throws IOException {
//获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
//从连接中创建通道
Channel channel = connection.createChannel();
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//消息内容
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[product] Send '"+ message +"'");
//关闭通道和连接
channel.close();
connection.close();
}
}
该类首先通过连接工具类获取到了RabbitMQ的连接对象,类似JDBC连接数据库的连接对象。然后通过连接对象获取Channel通道,这个就相当于获取JDBC的Statement对象。通过连接对象创建Channel对象,相当于创建了一个与RabbitMQ的“通道”,通过“通道”可以做一系列与RabbitMQ交互的操作。然后使用channel对象的“queueDeclare”方法声明(创建)了一个“队列”,然后在参数中指定队列的名称(这里指定为“test_queue”)。
然后创建一个消息,通过channel对象的“basicPublish”方法将消息发送至“test_queue”队列。
最后关闭通道和连接。
运行该生产者:
然后我们可以在RabbitMQ的管理工具的Queues模块中查看队列信息:
点击队列名称,然后选择“Get messages”选择,点击按钮,可以查看队列中信息的详细内容:
图4.10.png
然后创建一个名为“Recv”的类,作为消息消费者:
package cn.jack.rabbitmq.simple;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws Exception {
//获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUEUE_NAME, true,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[Consumer] Received '"+ message +"'");
}
}
}
与生产者一样,首先获取连接对象和创建channel通道对象。我们这个消费者消费的是生产者的队列,上面的生产者已经创建好了改队列,该理说不需要再创建队列,但是为了防止消费者消费的队列不存在这种异常的发送,还是同样声明生产者一样的队列,以防万一(如果已存在,该次创建无效,不会影响已存在的相同的队列)。如果开发者明确改队列百分之百存在,则可以在消费者代码中忽略声明队列这一步。然后定义队列的消费者QueueingConsumer,然后使用“basicConsume”来监听队列。后面我们写了一个死循环,用于一致监听队列并获取消息。在循环块中使用consumer对象的nextDelivery()方法来进行消息的接收和消费。
这里要注意的是,在该模式下,消费者接收的消息一旦被消费,则队列中就不再有此消息(相当于“阅后即焚”)。在运行消费者之前,观察队列的情况:
运行消费者之后,在控制台可以看到消费者接收到的信息:
并且在管理工具中可以看到队列中的消息已经被消费而不存在:
以上就是RabbitMQ中最简单的消息队列模式,即是生产者生产消息并发送至消息队列,消费者监听队列并从队列中获取消息。
转载请注明出处:http://blog.csdn.net/acmman/article/details/79438196
上一篇: java定时器
下一篇: 专注互联网,大数据,云计算