欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ-Direct模式

程序员文章站 2022-07-11 08:18:41
简介 RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息中间件,基于Erlang语言编写。 P:(producling)生产者,生产只意味着发送消息。 Q: (queue_name)队列,队列是位于rabbitmq中的post box的名称 C: (Consuming)消费者,消费者主要 ......

简介

 rabbitmq是实现了高级消息队列协议(amqp)的开源消息中间件,基于erlang语言编写。

RabbitMQ-Direct模式

 

p:(producling)生产者,生产只意味着发送消息。

 RabbitMQ-Direct模式

q: (queue_name)队列,队列是位于rabbitmq中的post box的名称

 

c: (consuming)消费者,消费者主要是等待接收消息的程序

 

 

开发准备

  •  netcoretset.core:该工程主要封装了rabbitmq的公用方法
  • rabbitmqclient    :该工程为生产者
  • rabbitmqserver  :该工程为消费者

 

1.创建netcoretset.core类库项目

 

RabbitMQ-Direct模式

1.1 安装项目依赖

RabbitMQ-Direct模式

 

2.定义接口

using netcoretest.core.model;
using system;
using system.collections.generic;
using system.text;

namespace netcoretest.core.iserver
{
    public interface iconnectionserver
    {
      
        /// <summary>
        /// 连接服务
        /// </summary>
        void connection();
        /// <summary>
        /// 创建消息队列
        /// </summary>
        /// <param name="quename">队列名称</param>
        void createqueuedir();
        /// <summary>
        /// 关闭连接
        /// </summary>
        void closeconnection();
        /// <summary>
        /// 关闭通道
        /// </summary>
        void closechannel();


    }
}
using system;
using system.collections.generic;
using system.text;

namespace netcoretest.core.iserver
{
    public interface imessageservice
    {
        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="msg">消息内容</param>
        void sendmsg(string msg);
        /// <summary>
        /// 获取消息
        /// </summary>
        /// <returns></returns>
        string getmsg();
    }
}
using system;
using system.collections.generic;
using system.text;

namespace netcoretest.core.iserver
{
   public interface irabbitmqservice:imessageservice,iconnectionserver
    {
    }
}

 

 3.编写rabbitmq辅助类

using netcoretest.core.iserver;
using netcoretest.core.model;
using rabbitmq.client;
using rabbitmq.client.events;
using system;
using system.collections.generic;
using system.text;

namespace netcoretest.core
{
    public class rabbitmqmodel : irabbitmqservice
    {

        private readonly connectionfactory factory = null;
        private imodel channel;
        private iconnection connetction;
        readonly string exchangename;//交换机名称
        readonly string routekey;//路由名称
        readonly string queuename;///队列名称
        public rabbitmqmodel(hostmodel model)
        {
            /// <summary>
            /// 创建连接工厂
            /// </summary>
            factory = new connectionfactory
            {
                username = model.username,
                password = model.password,
                hostname = "localhost",
                port = model.port,
            };
            exchangename = model.exchangemodel.exchangename;
            routekey = model.exchangemodel.routekey;
            queuename = model.exchangemodel.queuename;
        }
        /// <summary>
        /// 创建连接
        /// </summary>
        public void connection()
        {
            try
            {
                //创建连接
                connetction = factory.createconnection();
                //创建信道
                channel = connetction.createmodel();
            }
            catch (exception ex)
            {
                console.writeline(ex.tostring());
            }
        }

        public void createqueuedir()
        {
            //定义一个direct类型的交换机
            channel.exchangedeclare(exchangename, exchangetype.direct, false, false, null);
            //定义一个队列
            channel.queuedeclare(queuename, false, false, false, null);
            //将队列绑定交换机
            channel.queuebind(queuename, exchangename, routekey, null);
        }public void sendmsg(string msg)
        {
            var sendbytes = encoding.utf8.getbytes(msg);
            channel.basicpublish(exchangename, routekey, null, sendbytes);
        }

        public void closechannel()
        {
            channel.close();
        }

        public void closeconnection()
        {
            connetction.close();
        }

        public string getmsg()
        {
            //事件基本消费者
            eventingbasicconsumer consumer = new eventingbasicconsumer(channel);
            string msg = null;
            //接收到消息事件
            consumer.received += (ch, ea) =>
            {
                var message = encoding.utf8.getstring(ea.body);
                msg = message;
                console.writeline($"收到消息: {message}");
                //确认该消息已被消费
                channel.basicack(ea.deliverytag, false);
            };
            //启动消费者 设置为手动应答消息
            channel.basicconsume(queuename, false, consumer);
            console.writeline("消费者已启动");
            console.readkey();
            closeconnection();
            closechannel();
            return msg;
        }


    }
}

4.创建direct模式发送类

using netcoretest.core.model;
using system;
using system.collections.generic;
using system.text;

namespace netcoretest.core.exchangetypemodel
{

    /// <summary>
    /// direct模式发送
    /// </summary>
    public class directpost
    {


        rabbitmqmodel rabbitmqmodel;

        public directpost()
        {
            hostmodel hostmodel = new hostmodel();
            hostmodel.username = "admin";
            hostmodel.password = "admin";
            hostmodel.host = "127.0.0.1";
            hostmodel.port = 5672;
            hostmodel.exchangemodel =new exchangemodel {
                exchangename = "clentname",
                queuename = "clent",
                routekey = "clentroute"
            };
            rabbitmqmodel = new rabbitmqmodel(hostmodel);
            rabbitmqmodel.connection();

        }
        public void createqueue()
        {
            rabbitmqmodel.createqueuedir();
        }
        public void sendmsg(string msg)
        {
            rabbitmqmodel.sendmsg(msg);
        }
        public void getmsg()
        {
            rabbitmqmodel.getmsg();
        }
    }
}

5.创建rabbitmqclient控制台应用程序

 

 

using netcoretest.core;
using netcoretest.core.exchangetypemodel;
using netcoretest.core.model;
using rabbitmq.client;
using system;

namespace rabbitmqclient
{
    class program
    {
        static void main(string[] args)
        {
            console.writeline("消息生产者开始生产数据!");
            console.writeline("输入exit退出!");
            directpost directpost = new directpost();
            directpost.createqueue();
            string input;
           
            do
            {
                input = console.readline();
                directpost.sendmsg(input);

            } while (input.trim().tolower() != "exit");


        }
    }
}

6.创建rabbitmqservice控制台应用程序

using netcoretest.core;
using netcoretest.core.exchangetypemodel;
using netcoretest.core.model;
using system;
using system.text;

namespace rabbitmqserver
{
    class program
    {
        static void main(string[] args)
        {
            console.writeline("hello world!");

            directpost directpost = new directpost();
            directpost.getmsg();
        

        }
    }
}

7.执行rabbitmqclient和rabbitmqserver

RabbitMQ-Direct模式