RabbitMQ-Direct模式
程序员文章站
2022-07-11 08:18:41
简介 RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息中间件,基于Erlang语言编写。 P:(producling)生产者,生产只意味着发送消息。 Q: (queue_name)队列,队列是位于rabbitmq中的post box的名称 C: (Consuming)消费者,消费者主要 ......
简介
rabbitmq是实现了高级消息队列协议(amqp)的开源消息中间件,基于erlang语言编写。
p:(producling)生产者,生产只意味着发送消息。
q: (queue_name)队列,队列是位于rabbitmq中的post box的名称
c: (consuming)消费者,消费者主要是等待接收消息的程序
开发准备
- netcoretset.core:该工程主要封装了rabbitmq的公用方法
- rabbitmqclient :该工程为生产者
- rabbitmqserver :该工程为消费者
1.创建netcoretset.core类库项目
1.1 安装项目依赖
2.定义接口
using netcoretest.core.model; using system; using system.collections.generic; using system.text; namespace netcoretest.core.iserver { public interface iconnectionserver { /// <summary> /// 连接服务 /// </summary> void connection(); /// <summary> /// 创建消息队列 /// </summary> /// <param name="quename">队列名称</param> void createqueuedir(); /// <summary> /// 关闭连接 /// </summary> void closeconnection(); /// <summary> /// 关闭通道 /// </summary> void closechannel(); } }
using system; using system.collections.generic; using system.text; namespace netcoretest.core.iserver { public interface imessageservice { /// <summary> /// 发送消息 /// </summary> /// <param name="msg">消息内容</param> void sendmsg(string msg); /// <summary> /// 获取消息 /// </summary> /// <returns></returns> string getmsg(); } }
using system; using system.collections.generic; using system.text; namespace netcoretest.core.iserver { public interface irabbitmqservice:imessageservice,iconnectionserver { } }
3.编写rabbitmq辅助类
using netcoretest.core.iserver; using netcoretest.core.model; using rabbitmq.client; using rabbitmq.client.events; using system; using system.collections.generic; using system.text; namespace netcoretest.core { public class rabbitmqmodel : irabbitmqservice { private readonly connectionfactory factory = null; private imodel channel; private iconnection connetction; readonly string exchangename;//交换机名称 readonly string routekey;//路由名称 readonly string queuename;///队列名称 public rabbitmqmodel(hostmodel model) { /// <summary> /// 创建连接工厂 /// </summary> factory = new connectionfactory { username = model.username, password = model.password, hostname = "localhost", port = model.port, }; exchangename = model.exchangemodel.exchangename; routekey = model.exchangemodel.routekey; queuename = model.exchangemodel.queuename; } /// <summary> /// 创建连接 /// </summary> public void connection() { try { //创建连接 connetction = factory.createconnection(); //创建信道 channel = connetction.createmodel(); } catch (exception ex) { console.writeline(ex.tostring()); } } public void createqueuedir() { //定义一个direct类型的交换机 channel.exchangedeclare(exchangename, exchangetype.direct, false, false, null); //定义一个队列 channel.queuedeclare(queuename, false, false, false, null); //将队列绑定交换机 channel.queuebind(queuename, exchangename, routekey, null); }public void sendmsg(string msg) { var sendbytes = encoding.utf8.getbytes(msg); channel.basicpublish(exchangename, routekey, null, sendbytes); } public void closechannel() { channel.close(); } public void closeconnection() { connetction.close(); } public string getmsg() { //事件基本消费者 eventingbasicconsumer consumer = new eventingbasicconsumer(channel); string msg = null; //接收到消息事件 consumer.received += (ch, ea) => { var message = encoding.utf8.getstring(ea.body); msg = message; console.writeline($"收到消息: {message}"); //确认该消息已被消费 channel.basicack(ea.deliverytag, false); }; //启动消费者 设置为手动应答消息 channel.basicconsume(queuename, false, consumer); console.writeline("消费者已启动"); console.readkey(); closeconnection(); closechannel(); return msg; } } }
4.创建direct模式发送类
using netcoretest.core.model; using system; using system.collections.generic; using system.text; namespace netcoretest.core.exchangetypemodel { /// <summary> /// direct模式发送 /// </summary> public class directpost { rabbitmqmodel rabbitmqmodel; public directpost() { hostmodel hostmodel = new hostmodel(); hostmodel.username = "admin"; hostmodel.password = "admin"; hostmodel.host = "127.0.0.1"; hostmodel.port = 5672; hostmodel.exchangemodel =new exchangemodel { exchangename = "clentname", queuename = "clent", routekey = "clentroute" }; rabbitmqmodel = new rabbitmqmodel(hostmodel); rabbitmqmodel.connection(); } public void createqueue() { rabbitmqmodel.createqueuedir(); } public void sendmsg(string msg) { rabbitmqmodel.sendmsg(msg); } public void getmsg() { rabbitmqmodel.getmsg(); } } }
5.创建rabbitmqclient控制台应用程序
using netcoretest.core; using netcoretest.core.exchangetypemodel; using netcoretest.core.model; using rabbitmq.client; using system; namespace rabbitmqclient { class program { static void main(string[] args) { console.writeline("消息生产者开始生产数据!"); console.writeline("输入exit退出!"); directpost directpost = new directpost(); directpost.createqueue(); string input; do { input = console.readline(); directpost.sendmsg(input); } while (input.trim().tolower() != "exit"); } } }
6.创建rabbitmqservice控制台应用程序
using netcoretest.core; using netcoretest.core.exchangetypemodel; using netcoretest.core.model; using system; using system.text; namespace rabbitmqserver { class program { static void main(string[] args) { console.writeline("hello world!"); directpost directpost = new directpost(); directpost.getmsg(); } } }
7.执行rabbitmqclient和rabbitmqserver
上一篇: azure 上传blob到ams(CreateFromBlob)
下一篇: php.ini 中文版