RabbitMq之Publish/Subscribe
在之前的例子中,rabbit传递消息的形式如下:
1.send发送消息到指定的一个队列中
2.队列充当一个消息存储容器.
3.consumer从队列中消费消息.
当存在多个consumer消费者的时候,rabbitmq会比较平均的分配消息给每个consumer,也就是说,每个consumer获取的消息都是队列消息的一个子集.
而在发布/订阅这种模式中,消息传递模型的核心思想是,生产者不发送任何信息直接到队列。事实上,生产者也不知道消息是否会被传送到任何队列。 rabbitmq引入了一个中间者exchanges.
Exchanges
1.send发送消息到指定的一个exchanges中
2.可以将任意的队列绑定到exchanges中,并且消息会随着exchanges的广播发送到队列中
3.队列充当一个消息存储容器.
4.consumer从队列中消费消息.
生产者只能发送消息到Exchange。exchanges是件很简单的事。一方面它接收来自生产者和另一方的消息,并将它们推送到队列中。Exchange必须知道它接收哪些消息。是否应该附加到特定队列?它应该被附加到多个队列吗?或者它应该被丢弃。该规则由Exchange类型定义。
exchanges有几种类型:direct, topic, headers and fanout
更多请查看:http://blog.csdn.net/yzr_java/article/details/70916084
Listing exchanges可以在命令行中监听exchanges: rabbitmqctl list_exchanges
Temporary queues
临时队列:由rabbitmq服务器自动创建的队列,当连接关闭时,临时队列将会被自动删除.
exchanges可以配合使用临时队列,将消息广播到临时队列中.
//服务器创建队列
String queueName = channel.queueDeclare().getQueue();
Bindings
将队列绑定到exchanges中,当exchanges进行广播的时候,被绑定的队列将会接收到消息.
//将队列绑定到exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");
Listing bindings
可以使用命令行监听: rabbitmqctl list_bindings
package yzr.main;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建exchange
//fanout将消息广播到队列中
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//服务器创建队列
String queueName = channel.queueDeclare().getQueue();
//将队列绑定到exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
测试:
启动上面的ReceiveLogs类,然后http://localhost:15672登陆之后看一下exchanges视图:
然后再看一下队列视图:
[x] Sent '111 message.'
[x] Sent '211 message..'
[x] Sent '311 message...'
[x] Sent '411 message....'
[x] Sent '511 message.....'
[x] Sent '611 message.....'
[x] Sent '711 message.....'
[x] Sent '811 message.....'
[x] Sent '911 message.....'
上一篇: Publish/Subscribe
下一篇: 数据结构(快速排序——递归实现)
推荐阅读
-
JavaScript观察者模式(publish/subscribe)原理与实现方法
-
RabbitMQ消息队列之Windows下安装和部署(一)
-
RabbitMQ指南之四:路由(Routing)和直连交换机(Direct Exchange)
-
RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)
-
RabbitMQ指南之五:主题交换器(Topic Exchange)
-
RabbitMQ消息队列之基础 (二)
-
理解 Redis(9) - Publish Subscribe 消息订阅
-
python学习之RabbitMQ-----消息队列
-
RabbitMQ 入门篇之——五种工作模式
-
消息队列RabbitMQ之Spring-AMQP