欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ五:Topics

程序员文章站 2022-07-13 23:27:11
...

RabbitMQ五:Topics

上一篇文章中,使用direct交换机替换之前的fanout交换机进行有选择的日志消息接收。

本文将通过以下需求来说明Topic
在我们的日志系统中,我们可能不仅要根据严重性订阅日志,还要根据发出日志的源来订阅日志。

主题交换机(Topic exchange)

格式:必须是由点分隔的单词列表,单词可以是任何内容。

  • *(星号)代表一个单词。
  • #(hash)代表0个或多个单词。

如下:

RabbitMQ五:Topics

  • 路由**设置为 any.orange.rabbit 的消息将传递到两个队列。
  • lazy.orange.elephant 的消息将传递到两个队列。
  • quick.orange.fox 只会进入第一个队列。
  • lazy.brown.fox 只会进入第二个队列。
  • lazy.pink.rabbit 将仅传递到第二个队列一次,即使它匹配两个绑定
  • quick.brown.fox 与任何绑定都不匹配,因此它将被丢弃

示例

EmitLogTopic

class EmitLogTopic
{
    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "127.0.0.1" };
        using (var conncetion = factory.CreateConnection())
        using (var channel = conncetion.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
            var routeKey = (args.Length > 0) ? args[0] : "anonymous.info";
            var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "topic_logs", routingKey: routeKey, basicProperties: null, body: body);
            Console.WriteLine($" [x] Sent {routeKey}:{message}");
        }
    }
}

ReceiveLogsTopic

class ReceiveLogsTopic
{
    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "127.0.0.1" };
        using (var conncetion = factory.CreateConnection())
        using (var channel = conncetion.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
            var queueName = channel.QueueDeclare().QueueName;

            if (args.Length < 1)
            {
                Console.WriteLine($"Usage:{Environment.GetCommandLineArgs()[0]} [binding_key...]");
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            foreach (var bindingKey in args)
            {
                channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: bindingKey);
            }

            Console.WriteLine(" [*] Waiting for message.To exit press CTRL+C");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine($" [x] Received {routingKey}:{message}");
            };
            channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

运行效果:
RabbitMQ五:Topics

Topics

Github