RabbitMQ五:Topics
程序员文章站
2022-07-13 23:27:11
...
RabbitMQ五:Topics
上一篇文章中,使用direct
交换机替换之前的fanout
交换机进行有选择的日志消息接收。
本文将通过以下需求来说明Topic
:
在我们的日志系统中,我们可能不仅要根据严重性订阅日志,还要根据发出日志的源来订阅日志。
主题交换机(Topic exchange)
格式:必须是由点分隔的单词列表,单词可以是任何内容。
- *(星号)代表一个单词。
- #(hash)代表0个或多个单词。
如下:
- 路由**设置为
any.orange.rabbit
的消息将传递到两个队列。 -
lazy.orange.elephant
的消息将传递到两个队列。 -
quick.orange.fox
只会进入第一个队列。 -
lazy.brown.fox
只会进入第二个队列。 -
lazy.pink.rabbit
将仅传递到第二个队列一次,即使它匹配两个绑定 -
quick.brown.fox
与任何绑定都不匹配,因此它将被丢弃
示例
EmitLogTopic
class EmitLogTopic
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "127.0.0.1" };
using (var conncetion = factory.CreateConnection())
using (var channel = conncetion.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var routeKey = (args.Length > 0) ? args[0] : "anonymous.info";
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "topic_logs", routingKey: routeKey, basicProperties: null, body: body);
Console.WriteLine($" [x] Sent {routeKey}:{message}");
}
}
}
ReceiveLogsTopic
class ReceiveLogsTopic
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "127.0.0.1" };
using (var conncetion = factory.CreateConnection())
using (var channel = conncetion.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var queueName = channel.QueueDeclare().QueueName;
if (args.Length < 1)
{
Console.WriteLine($"Usage:{Environment.GetCommandLineArgs()[0]} [binding_key...]");
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach (var bindingKey in args)
{
channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: bindingKey);
}
Console.WriteLine(" [*] Waiting for message.To exit press CTRL+C");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine($" [x] Received {routingKey}:{message}");
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
运行效果: