欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【RabbitMQ消息中间件】4.简单队列

程序员文章站 2022-07-14 22:46:18
...
安装和启动好了RabbitMQ后,我们下面来通过编写一个Java程序来学习第一个队列知识。

第一篇我们提到过,RabbitMQ一共有六种队列模式:
【RabbitMQ消息中间件】4.简单队列
分别是“HelloWorld”简单队列模式、“Work queues”工作队列模式、“Publish/Subscribe”发布/订阅模式、“Routing”路由模式、“Topics”通配符模式、“RPC”远程服务调用。

那么我们首先从最基本的“HelloWorld”简单队列讲解。关于“HelloWorld”简单队列的队列模式图如下所示:
【RabbitMQ消息中间件】4.简单队列
其中“P”代表消息的生产者(Producter),由它来产生消息。而中间红色的框就是队列,即是生产者产生消息之后,将消息发送到队列。后面的“C”代表消息的消费者(consumer),即是将生产者发送至消息队列中的消息进行消费(取出)。
这就是最简单的一个队列模式。

我们下面使用Java连接RabbitMQ,并实现简单队列。
首先创建一个maven工程:

【RabbitMQ消息中间件】4.简单队列

【RabbitMQ消息中间件】4.简单队列

【RabbitMQ消息中间件】4.简单队列

【RabbitMQ消息中间件】4.简单队列

首先需要在maven工程中的pom.xml加入RabbitMQ的依赖和相关其它依赖:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.jack.rabbitmq</groupId>
  <artifactId>RabbitMQ_Test_project</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <build/>
  
  <dependencies>
		<dependency>
		    <groupId>com.rabbitmq</groupId>
		    <artifactId>amqp-client</artifactId>
		    <version>3.4.1</version>
		</dependency>
		<dependency>
		    <groupId>org.slf4j</groupId>
		    <artifactId>slf4j-log4j12</artifactId>
		    <version>1.7.7</version>
		</dependency>
		<dependency>
		    <groupId>org.apache.commons</groupId>
		    <artifactId>commons-lang3</artifactId>
		    <version>3.3.2</version>
		</dependency>
		
		<dependency>
	            <groupId>org.springframework.amqp</groupId>
	            <artifactId>spring-rabbit</artifactId>
	            <version>1.4.0.RELEASE</version>
	        </dependency>
    </dependencies>
</project>
然后在src/mian/java中创建一个连接工厂ConnectionUtil,用于连接RabbitMQ:
【RabbitMQ消息中间件】4.简单队列
package cn.jack.rabbitmq.connection;
import java.io.IOException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    public static Connection getConnection() throws IOException{
    	//定义连接工厂
    	ConnectionFactory factory = new ConnectionFactory();
    	//定义连接地址
    	factory.setHost("localHost");
    	//定义端口
    	factory.setPort(5672);
    	//设置账号信息,用户名、密码、vhost
    	factory.setVirtualHost("/jack");
    	factory.setUsername("jack");
    	factory.setPassword("jack");
    	// 通过工厂获取连接
    	Connection connection = factory.newConnection();
    	return connection;
    }
}
在该类中首先定义了一个连接工厂“ConnectionFactory”,然后设置其服务地址、端口号、账号信息(用户名、密码、vhost),最后通过连接工厂获取一个RabbitMQ的连接对象。
上面我们的账号信息使用了一开始创建的jack账号的信息。

然后创建消息的生产者和消费者的模拟类。新建名为“Send”的类,作为生产者:
【RabbitMQ消息中间件】4.简单队列
package cn.jack.rabbitmq.simple;
import java.io.IOException;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {
	
    private final static String QUEUE_NAME="test_queue";
    
    public static void main(String[] args) throws IOException {
		//获取到连接以及mq通道
    	Connection connection = ConnectionUtil.getConnection();
    	//从连接中创建通道
    	Channel channel = connection.createChannel();
    	
    	//声明(创建)队列
    	channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    	
    	//消息内容
    	String message = "Hello World!";
    	channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    	System.out.println("[product] Send '"+ message +"'");
    	
    	//关闭通道和连接
    	channel.close();
    	connection.close();
    }
}
该类首先通过连接工具类获取到了RabbitMQ的连接对象,类似JDBC连接数据库的连接对象。然后通过连接对象获取Channel通道,这个就相当于获取JDBC的Statement对象。通过连接对象创建Channel对象,相当于创建了一个与RabbitMQ的“通道”,通过“通道”可以做一系列与RabbitMQ交互的操作。
然后使用channel对象的“queueDeclare”方法声明(创建)了一个“队列”,然后在参数中指定队列的名称(这里指定为“test_queue”)。
然后创建一个消息,通过channel对象的“basicPublish”方法将消息发送至“test_queue”队列。
最后关闭通道和连接。

运行该生产者:
【RabbitMQ消息中间件】4.简单队列
然后我们可以在RabbitMQ的管理工具的Queues模块中查看队列信息:
【RabbitMQ消息中间件】4.简单队列
点击队列名称,然后选择“Get messages”选择,点击按钮,可以查看队列中信息的详细内容:
【RabbitMQ消息中间件】4.简单队列图4.10.png

然后创建一个名为“Recv”的类,作为消息消费者:
【RabbitMQ消息中间件】4.简单队列
package cn.jack.rabbitmq.simple;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {
    
	private final static String QUEUE_NAME = "test_queue";
	
	public static void main(String[] args) throws Exception {
		//获取到连接以及mq通道
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		
		//声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		
		//定义队列的消费者
		QueueingConsumer consumer = new QueueingConsumer(channel);
		//监听队列
		channel.basicConsume(QUEUE_NAME, true,consumer);
		
		//获取消息
		while(true){
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println("[Consumer] Received '"+ message +"'");
		}
	}
}
与生产者一样,首先获取连接对象和创建channel通道对象。我们这个消费者消费的是生产者的队列,上面的生产者已经创建好了改队列,该理说不需要再创建队列,但是为了防止消费者消费的队列不存在这种异常的发送,还是同样声明生产者一样的队列,以防万一(如果已存在,该次创建无效,不会影响已存在的相同的队列)。如果开发者明确改队列百分之百存在,则可以在消费者代码中忽略声明队列这一步。
然后定义队列的消费者QueueingConsumer,然后使用“basicConsume”来监听队列。后面我们写了一个死循环,用于一致监听队列并获取消息。在循环块中使用consumer对象的nextDelivery()方法来进行消息的接收和消费。

这里要注意的是,在该模式下,消费者接收的消息一旦被消费,则队列中就不再有此消息(相当于“阅后即焚”)。在运行消费者之前,观察队列的情况:
【RabbitMQ消息中间件】4.简单队列
运行消费者之后,在控制台可以看到消费者接收到的信息:
【RabbitMQ消息中间件】4.简单队列
并且在管理工具中可以看到队列中的消息已经被消费而不存在:
【RabbitMQ消息中间件】4.简单队列

以上就是RabbitMQ中最简单的消息队列模式,即是生产者生产消息并发送至消息队列,消费者监听队列并从队列中获取消息。

转载请注明出处:http://blog.csdn.net/acmman/article/details/79438196