C#下实现RabbitMQ。 Rabbitmqrabbitmq 示例
程序员文章站
2022-07-13 15:46:40
...
1.首先需要下载.net下的驱动类。rabbitmq.client.dll.安装后可以使用提供的文档。当然也可以直接从别的地方只下载rabbitmq.client.dll使用。
下载地址:http://www.rabbitmq.com/dotnet.html
文档和安装程序都有了。
2.然后建立项目导入引用
一、首先建立一个消息的发送者类Sender
using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Content; using System.Collections; namespace Sender { /// <summary> /// 向队列中写入一个消息 /// </summary> public class ProduceMQ { static void Main(string[] args) { //服务器所在的主机ip Uri uri = new Uri("amqp://192.168.1.99:8688/"); string exchange = "routing";//路由 string exchangeType = "direct";//交换模式 string routingKey = "rk";//路由关键字 //是否对消息队列持久化保存 bool persistMode = true; ConnectionFactory cf = new ConnectionFactory(); cf.UserName = "gyg";//某个vhost下的用户 cf.Password = "123456"; cf.VirtualHost = "gyg001";//vhost cf.RequestedHeartbeat = 0; cf.Endpoint = new AmqpTcpEndpoint(uri); //创建一个连接到具体总结点的连接 using (IConnection conn = cf.CreateConnection()) { //创建并返回一个新连接到具体节点的通道 using (IModel ch = conn.CreateModel()) { if (exchangeType != null) {//声明一个路由 ch.ExchangeDeclare(exchange, exchangeType); //声明一个队列 ch.QueueDeclare("q", true, false, false, null); //将一个队列和一个路由绑定起来。并制定路由关键字 ch.QueueBind("q1", exchange, routingKey); } ///构造消息实体对象并发布到消息队列上 IMapMessageBuilder b = new MapMessageBuilder(ch); IDictionary target = b.Headers; target["header"] = "hello world"; IDictionary targerBody = b.Body; targerBody["body"] = "hello world";//这个才是具体的发送内容 if (persistMode) { ((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2; //设定传输模式 } //写入 ch.BasicPublish(exchange, routingKey, (IBasicProperties)b.GetContentHeader(), b.GetContentBody()); Console.WriteLine("写入成功"); } } } } }
二、创建一个接受者:receiver
using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; using RabbitMQ.Util; namespace receiver { public class receiver { static void Main(string[] args) { // Uri uri = new Uri("amqp://127.0.0.1:8688/"); string exchange = "routing"; string exchangeType = "direct"; string routingKey = "rk"; string serverAddress ="amqp://127.0.0.1:8688/"; ConnectionFactory cf = new ConnectionFactory(); cf.Uri = serverAddress; cf.UserName = "gyg"; cf.Password = "123456"; cf.VirtualHost = "gyg001"; cf.RequestedHeartbeat = 0; //cf.Endpoint = new AmqpTcpEndpoint(uri); using (IConnection conn = cf.CreateConnection()) { using (IModel ch = conn.CreateModel()) { //普通使用方式BasicGet //noAck = true,不需要回复,接收到消息后,queue上的消息就会清除 //noAck = false,需要回复,接收到消息后,queue上的消息不会被清除, //直到调用channel.basicAck(deliveryTag, false); //queue上的消息才会被清除 而且,在当前连接断开以前,其它客户端将不能收到此queue上的消息 BasicGetResult res = ch.BasicGet("q", false/*noAck*/); if (res!= null) { Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body)); ch.BasicAck(res.DeliveryTag, false); } else { Console.WriteLine("无内容!!"); } ch.Close(); } conn.Close(); } Console.ReadLine(); } } }
当然了这种方式是最简单的每次发送一个。接受者每次取得一个。
注意:这种方式发送者和接受者不能同时访问服务器。
下一篇: java编程思想-多态 深入理解