欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

.Net下RabbitMQ消息队列的使用

程序员文章站 2024-03-13 12:01:51
...

整个项目就是实现C#客户端往消息队列生产消息,消费消息。

 

环境搭建

 

1、Erlang安装

 RabbitMQ由ERLANG实现,故需要安装Erlang。
1)下载Erlang
    下载官网:http://www.erlang.org/download.html
    实践安装版本:otp_win64_19.0.exe。
2)安装Erlang
    运行Exe按照提示一路Next安装下来。
3)配置Erlang
    本机安装目录:D:\Program Files\erl8.0
    环境变量Path中添加D:\Program Files\erl8.0\bin

 

.Net下RabbitMQ消息队列的使用

 

4) 检测Erlang
    命令行中输入erl命令可检测Erlang是否安装成功。

.Net下RabbitMQ消息队列的使用

 

 

2、RabbitMQ安装

 

 对应RabbitMQ的版本为V3.6.5。
1)下载RabbitMQ
    对应官网:http://www.rabbitmq.com/download.html
    下载安装包:rabbitmq-server-3.6.5.exe。
2)安装RabbitMQ
    运行Exe,一路Next安装完毕。

3)安装完成查看

.Net下RabbitMQ消息队列的使用

 

 

3、RabbitMQ启动

 

1)启动管理插件
    运行RabbitMQ Command Prompt(sbin dir)。
    输入rabbitmq-plugins enable rabbitmq_management,启动管理插件。

.Net下RabbitMQ消息队列的使用

2)启动管理服务
    运行rabbitmq-service.bat start。
3)登录管理界面
    浏览器输入:http://localhost:15672/#/。

    用户名:guest,密码guest。

.Net下RabbitMQ消息队列的使用

 

 

生产者消费者

示例代码采用C#,对应IDE为VS2013。
需要用到RabbitMQ .NET/C# client library。
RabbitMQ .NET/C# client library下载地址:http://www.rabbitmq.com/dotnet.html
.Net下RabbitMQ消息队列的使用

解压缩,里面存在RabbitMQ.Client.dll文件。

 

VS2013新建一个C#控制台项目,添加引用,把上面下载的RabbitMQ.Client.dll添加进来。

新建一个类MyRabbitMq,具体代码如下

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
 
 
using System.IO;
using System.Threading;
using RabbitMQ.Client;
 
 
 
 
namespace RabbitmqClient
{
    /// <summary>
    /// P2P模式,即一个生产者一个消费者
    /// </summary>
    public class MyRabbitMq
    {
        private readonly ConnectionFactory rabbitMqFactory;
        const string ExchangeName = "test.exchange";
        const string QueueName = "test.queue";
 
 
        public MyRabbitMq(bool isLocal = true, string remoteAddress = "localhost")
        {
            if (isLocal)
                rabbitMqFactory = new ConnectionFactory { HostName = "localhost" };
            else 
                rabbitMqFactory = new ConnectionFactory { HostName = remoteAddress };
        }
 
 
        public void Register_durable_Exchange_and_Queue()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange: ExchangeName, type: "direct", durable: true, autoDelete: false, arguments: null);
                channel.QueueDeclare(queue: QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                channel.QueueBind(queue: QueueName, exchange: ExchangeName, routingKey: QueueName);
            }
        }
 
 
        /// <summary>
        /// 生产者,插入消息
        /// </summary>
        /// <param name="message">消息</param>
        /// <param name="persistent">是否持久化</param>
        public void SendMessage(string message, bool persistent = true)
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                var props = channel.CreateBasicProperties();
                if (persistent)
                {
                    props.Persistent = true;                
                }
 
 
                var msgBody = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
 
 
                Console.WriteLine(" [x] Sent {0}", message);
            }
        }
 
 
        /// <summary>
        /// 消费者,取出消息
        /// </summary>
        /// <returns></returns>
        public string GetMessage()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                BasicGetResult msgResponse = channel.BasicGet(queue: QueueName, noAck: true);
 
 
                string msgBody = Encoding.UTF8.GetString(msgResponse.Body);
 
 
                return msgBody;
            }
        }
 
 
 
 
        /// <summary>
        /// 一次都消费光,清空队列,没有消息会阻塞等
        /// </summary>
        /// <returns></returns>
        public string Consume_messages_from_Queue_Subscription()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                var consumer = new QueueingBasicConsumer(channel);
 
 
                channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
 
 
                var msgResponse = consumer.Queue.Dequeue(); //blocking
 
 
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
 
 
                return msgBody;
            }
        }
 
 
 
 
        public void Publish_5_messages_to_test_exchange()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                for (var i = 0; i < 5; i++)
                {
                    var props = channel.CreateBasicProperties();
                    //props.SetPersistent(true);
                    props.Persistent = true;
                    string msg = "Hello, World!" + i;
                    var msgBody = Encoding.UTF8.GetBytes(msg);
                    channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
                }
            }
        }
 
 
    }
}

在main()中往队列插入消息

static void Main(string[] args)
        {
           <span style="white-space:pre">	</span>  //第一次测试
            MyRabbitMq test = new MyRabbitMq();
            test.Register_durable_Exchange_and_Queue();
            test.SendMessage("hahaha", true);          
            //Console.WriteLine(test.GetMessage());
            //Console.WriteLine(test.Consume_messages_from_Queue_Subscription());
        }

运行项目5次, 进入管理界面,可以看到已经创建了一个名叫test.queue的消息队列,消息Ready标志为5。

 

.Net下RabbitMQ消息队列的使用

 

 

 

后续

 

这种模式是最简单的P2P模式,还有其他5种模式后面再续。

还要研究如何设置优先级、设置消息ID问题。

 

 

参考

http://blog.csdn.net/segen_jaa/article/details/43230431 (安装)

http://www.rabbitmq.com/install-windows.html (官网)

https://github.com/ServiceStack/rabbitmq-windows (producer)

http://www.ibm.com/developerworks/cn/opensource/os-cn-rabbit-mq/   (consumer)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/PeterPan_hai/article/details/52249397

上一篇: Java代码优化细节

下一篇: