.Net下RabbitMQ消息队列的使用
整个项目就是实现C#客户端往消息队列生产消息,消费消息。
环境搭建
1、Erlang安装
RabbitMQ由ERLANG实现,故需要安装Erlang。
1)下载Erlang
下载官网:http://www.erlang.org/download.html。
实践安装版本:otp_win64_19.0.exe。
2)安装Erlang
运行Exe按照提示一路Next安装下来。
3)配置Erlang
本机安装目录:D:\Program Files\erl8.0
环境变量Path中添加D:\Program Files\erl8.0\bin
4) 检测Erlang
命令行中输入erl命令可检测Erlang是否安装成功。
2、RabbitMQ安装
对应RabbitMQ的版本为V3.6.5。
1)下载RabbitMQ
对应官网:http://www.rabbitmq.com/download.html。
下载安装包:rabbitmq-server-3.6.5.exe。
2)安装RabbitMQ
运行Exe,一路Next安装完毕。
3)安装完成查看
3、RabbitMQ启动
1)启动管理插件
运行RabbitMQ Command Prompt(sbin dir)。
输入rabbitmq-plugins enable rabbitmq_management,启动管理插件。
2)启动管理服务
运行rabbitmq-service.bat start。
3)登录管理界面
浏览器输入:http://localhost:15672/#/。
用户名:guest,密码guest。
生产者消费者
示例代码采用C#,对应IDE为VS2013。
需要用到RabbitMQ .NET/C# client library。
RabbitMQ .NET/C# client library下载地址:http://www.rabbitmq.com/dotnet.html
解压缩,里面存在RabbitMQ.Client.dll文件。
VS2013新建一个C#控制台项目,添加引用,把上面下载的RabbitMQ.Client.dll添加进来。
新建一个类MyRabbitMq,具体代码如下
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.IO;
using System.Threading;
using RabbitMQ.Client;
namespace RabbitmqClient
{
/// <summary>
/// P2P模式,即一个生产者一个消费者
/// </summary>
public class MyRabbitMq
{
private readonly ConnectionFactory rabbitMqFactory;
const string ExchangeName = "test.exchange";
const string QueueName = "test.queue";
public MyRabbitMq(bool isLocal = true, string remoteAddress = "localhost")
{
if (isLocal)
rabbitMqFactory = new ConnectionFactory { HostName = "localhost" };
else
rabbitMqFactory = new ConnectionFactory { HostName = remoteAddress };
}
public void Register_durable_Exchange_and_Queue()
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
channel.ExchangeDeclare(exchange: ExchangeName, type: "direct", durable: true, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: QueueName, exchange: ExchangeName, routingKey: QueueName);
}
}
/// <summary>
/// 生产者,插入消息
/// </summary>
/// <param name="message">消息</param>
/// <param name="persistent">是否持久化</param>
public void SendMessage(string message, bool persistent = true)
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
var props = channel.CreateBasicProperties();
if (persistent)
{
props.Persistent = true;
}
var msgBody = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
Console.WriteLine(" [x] Sent {0}", message);
}
}
/// <summary>
/// 消费者,取出消息
/// </summary>
/// <returns></returns>
public string GetMessage()
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
BasicGetResult msgResponse = channel.BasicGet(queue: QueueName, noAck: true);
string msgBody = Encoding.UTF8.GetString(msgResponse.Body);
return msgBody;
}
}
/// <summary>
/// 一次都消费光,清空队列,没有消息会阻塞等
/// </summary>
/// <returns></returns>
public string Consume_messages_from_Queue_Subscription()
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
var msgResponse = consumer.Queue.Dequeue(); //blocking
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
return msgBody;
}
}
public void Publish_5_messages_to_test_exchange()
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
for (var i = 0; i < 5; i++)
{
var props = channel.CreateBasicProperties();
//props.SetPersistent(true);
props.Persistent = true;
string msg = "Hello, World!" + i;
var msgBody = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
}
}
}
}
}
在main()中往队列插入消息
static void Main(string[] args)
{
<span style="white-space:pre"> </span> //第一次测试
MyRabbitMq test = new MyRabbitMq();
test.Register_durable_Exchange_and_Queue();
test.SendMessage("hahaha", true);
//Console.WriteLine(test.GetMessage());
//Console.WriteLine(test.Consume_messages_from_Queue_Subscription());
}
运行项目5次, 进入管理界面,可以看到已经创建了一个名叫test.queue的消息队列,消息Ready标志为5。
后续
这种模式是最简单的P2P模式,还有其他5种模式后面再续。
还要研究如何设置优先级、设置消息ID问题。
参考
http://blog.csdn.net/segen_jaa/article/details/43230431 (安装)
http://www.rabbitmq.com/install-windows.html (官网)
https://github.com/ServiceStack/rabbitmq-windows (producer)
http://www.ibm.com/developerworks/cn/opensource/os-cn-rabbit-mq/ (consumer)
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/PeterPan_hai/article/details/52249397
上一篇: Java代码优化细节