欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMq之Publish/Subscribe

程序员文章站 2022-06-04 10:32:59
...

在之前的例子中,rabbit传递消息的形式如下:

1.send发送消息到指定的一个队列中

2.队列充当一个消息存储容器.

3.consumer从队列中消费消息.

当存在多个consumer消费者的时候,rabbitmq会比较平均的分配消息给每个consumer,也就是说,每个consumer获取的消息都是队列消息的一个子集.

而在发布/订阅这种模式中,消息传递模型的核心思想是,生产者不发送任何信息直接到队列。事实上,生产者也不知道消息是否会被传送到任何队列。 rabbitmq引入了一个中间者exchanges. 

Exchanges

1.send发送消息到指定的一个exchanges中

2.可以将任意的队列绑定到exchanges中,并且消息会随着exchanges的广播发送到队列中

3.队列充当一个消息存储容器.

4.consumer从队列中消费消息.

生产者只能发送消息到Exchange。exchanges是件很简单的事。一方面它接收来自生产者和另一方的消息,并将它们推送到队列中。Exchange必须知道它接收哪些消息。是否应该附加到特定队列?它应该被附加到多个队列吗?或者它应该被丢弃。该规则由Exchange类型定义。 

exchanges有几种类型:direct, topic, headers and fanout

更多请查看:http://blog.csdn.net/yzr_java/article/details/70916084

Listing exchanges

可以在命令行中监听exchanges:    rabbitmqctl list_exchanges

Temporary queues

临时队列:由rabbitmq服务器自动创建的队列,当连接关闭时,临时队列将会被自动删除.

exchanges可以配合使用临时队列,将消息广播到临时队列中.

//服务器创建队列
String queueName = channel.queueDeclare().getQueue();

Bindings

将队列绑定到exchanges中,当exchanges进行广播的时候,被绑定的队列将会接收到消息.

//将队列绑定到exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");

Listing bindings

可以使用命令行监听: rabbitmqctl list_bindings

package yzr.main;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class ReceiveLogs {

	private static final String EXCHANGE_NAME = "logs";

	  public static void main(String[] argv) throws Exception {
	    ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    Connection connection = factory.newConnection();
	    Channel channel = connection.createChannel();
	    //创建exchange
            //fanout将消息广播到队列中
	    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	    //服务器创建队列
	    String queueName = channel.queueDeclare().getQueue();
	    //将队列绑定到exchange
	    channel.queueBind(queueName, EXCHANGE_NAME, "");

	    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

	    Consumer consumer = new DefaultConsumer(channel) {
	      @Override
	      public void handleDelivery(String consumerTag, Envelope envelope,
	                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
	        String message = new String(body, "UTF-8");
	        System.out.println(" [x] Received '" + message + "'");
	      }
	    };
	    channel.basicConsume(queueName, true, consumer);
	  }

}


测试:

启动上面的ReceiveLogs类,然后http://localhost:15672登陆之后看一下exchanges视图:

RabbitMq之Publish/Subscribe

然后再看一下队列视图:

RabbitMq之Publish/Subscribe

发送消息:

 [x] Sent '111 message.'
 [x] Sent '211 message..'
 [x] Sent '311 message...'
 [x] Sent '411 message....'
 [x] Sent '511 message.....'
 [x] Sent '611 message.....'
 [x] Sent '711 message.....'
 [x] Sent '811 message.....'
 [x] Sent '911 message.....'

RabbitMq之Publish/Subscribe