欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

.net core实战中rabbitmq队列数据(websocket即时推送到前端页面)

程序员文章站 2022-04-29 22:57:21
本文介绍利用rabbitmq进行数据的消息队列管理,通过websocket通讯方式把队列中的数据即时推送到前端的一种解决方案。...

本文介绍利用rabbitmq进行数据的消息队列管理,通过websocket通讯方式把队列中的数据即时推送到前端的一种解决方案。

1、首先你要安装rabbitmq服务,如何使用rabbitmq和如何安装可以自行baidu,网上有很多详细的步骤说明,这里就不说了。

2、.net core项目中引入消息队列服务组件RabbitMQ.Client,通过nuget搜索此名称,安装即可。

3、我在项目中使用的是direct模式进行消息转发,我们首先定义一个基类并继承IHostedService,这样我们就可以在startup中方便的进行对象注入,即可实现消息的实时监听,监听到客户端push推送过来的消息队列数据时,我们即可把数据进行转发推送到websocket对象,这样数据就即时到达前端页面。整个解决方案的逻辑其实就是这样一句话概括,要实现细节体会这个逻辑,去实现这个逻辑即可。

下面我粘贴一个示例代码,首先是定义的基类,然后每个业务实现基类,业务类注册到service中进行监听

/// <summary>
    /// RabbitListener.cs 这个是基类,只实现注册RabbitMQ后到监听消息,然后每个消费者自己去重写RouteKey/QueueName/消息处理函数Process
    /// </summary>
    public class RabbitListener:IHostedService
    {
        private readonly IConnection connection;
        private readonly IModel channel;
         
        public RabbitListener(IOptions<AppConfiguration> options)
        {
            try
            {
                var factory = new ConnectionFactory()
                { 
                    HostName = options.Value.RabbitHost,
                    UserName = options.Value.RabbitUserName,
                    Password = options.Value.RabbitPassword,
                    Port = options.Value.RabbitPort,
                };
                this.connection = factory.CreateConnection();
                this.channel = connection.CreateModel();
            }
            catch (Exception ex)
            {
                Console.WriteLine($"RabbitListener init error,ex:{ex.Message}");
            }
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            Register();
            return Task.CompletedTask;
        }

        protected string exchangeName;
        protected string RouteKey;
        protected string QueueName;

        // 处理消息的方法
        public virtual bool Process(string message)
        {
            throw new NotImplementedException();
        }

        // 注册消费者监听在这里
        public void Register()
        {
            Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}");
            //声明交换机
            channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
            //声明队列
            //var queueName = channel.QueueDeclare().QueueName;
            channel.QueueDeclare(
             queue: QueueName,//消息队列名称
             durable: false,//是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息。
             exclusive: false,//是否排他,true排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除.
             autoDelete: false,//是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除.
             arguments: null ////设置队列的一些其它参数
              );
            //将队列与交换机进行绑定
            foreach (var routeKey in RouteKey)
            {
                //匹配多个路由
                channel.QueueBind(queue: QueueName, exchange: exchangeName, routingKey: RouteKey);
            }
            //声明为手动确认
            //channel.BasicQos(0, 1, false);
            //定义消费者
            var consumer = new EventingBasicConsumer(channel);
            //接收到消息事件
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray(); //.Span
                var message = Encoding.UTF8.GetString(body);
                var result = Process(message);
                if (result)
                {
                    //确认该消息已被消费
                    channel.BasicAck(ea.DeliveryTag, true);
                }
            };
            //启动消费者 设置为手动应答消息
            channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
        }

        public void DeRegister()
        {
            this.connection.Close();
        } 

        public Task StopAsync(CancellationToken cancellationToken)
        {
            this.connection.Close();
            return Task.CompletedTask;
        }
    }
下面是继承基类后,实现业务场景的数据监听,主要就是实现路由的配置和process接受到message数据后,进行数据处理。
/// <summary>
    /// 消费者实现process消息处理
    /// </summary>
    public class ChapterLister : RabbitListener
    {
        private readonly ILogger<RabbitListener> _logger;

        // 因为Process函数是委托回调,直接将其他Service注入的话两者不在一个scope,
        // 这里要调用其他的Service实例只能用IServiceProvider CreateScope后获取实例对象
        private readonly IServiceProvider _services;

        public ChapterLister(IServiceProvider services, IOptions<AppConfiguration> options,
         ILogger<RabbitListener> logger) : base(options)
        { 
            base.exchangeName = "exchange";
            base.RouteKey =  "TX";
            base.QueueName = "QN" + new Random().Next(1, 1000).ToString();
            _logger = logger;
            _services = services;

        }

        public override bool Process(string message)
        {
            var taskMessage = JToken.Parse(message);
            if (taskMessage == null)
            {
                // 返回false 的时候回直接驳回此消息,表示处理不了
                return false;
            }
            try
            {
                using (var scope = _services.CreateScope())
                {
                    var xxxService = scope.ServiceProvider.GetRequiredService<XXXXService>();
 
                    return true;
                }
            }
            catch (Exception ex)
            {
                _logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}");
                _logger.LogError(-1, ex, "Process fail");
                return false;
            }

        }
    }

以上主要是接受到了rabbitmq客户端push过来的数据,已经进入到message对象中,下面就可以把数据进行转发。

我这里介绍的是通过websocket实时转发,在以上的scope代码中我们定义了一个service的处理代码,实际修改为我们的websocket对象,用wsk对象进行数据转发到前端页面上,至于如何进行socket通讯看我发的这篇文章http://www.10qianwan.com/articledetail/779497.html

整个的解决方案大概介绍就是这样的,涉及到消息队列存储,消息监听,实时转发到前端页面。在实际开发项目中,我们可能就需要即时推送数据,通过api介绍到数据之后我们进行队列存储,消费队列数据进行转发,这样做到即时,比如我们的一些股票期货这样的软件就需要即时通讯,数据即时展示在客户端,就需要很强的及时转发机制,当然这个只是一个思路介绍,希望能对大家的项目开发起到帮助!

相关标签: rabbitmq websocket