.net core实战中rabbitmq队列数据(websocket即时推送到前端页面)
本文介绍利用rabbitmq进行数据的消息队列管理,通过websocket通讯方式把队列中的数据即时推送到前端的一种解决方案。
1、首先你要安装rabbitmq服务,如何使用rabbitmq和如何安装可以自行baidu,网上有很多详细的步骤说明,这里就不说了。
2、.net core项目中引入消息队列服务组件RabbitMQ.Client,通过nuget搜索此名称,安装即可。
3、我在项目中使用的是direct模式进行消息转发,我们首先定义一个基类并继承IHostedService,这样我们就可以在startup中方便的进行对象注入,即可实现消息的实时监听,监听到客户端push推送过来的消息队列数据时,我们即可把数据进行转发推送到websocket对象,这样数据就即时到达前端页面。整个解决方案的逻辑其实就是这样一句话概括,要实现细节体会这个逻辑,去实现这个逻辑即可。
下面我粘贴一个示例代码,首先是定义的基类,然后每个业务实现基类,业务类注册到service中进行监听
/// <summary> /// RabbitListener.cs 这个是基类,只实现注册RabbitMQ后到监听消息,然后每个消费者自己去重写RouteKey/QueueName/消息处理函数Process /// </summary> public class RabbitListener:IHostedService { private readonly IConnection connection; private readonly IModel channel; public RabbitListener(IOptions<AppConfiguration> options) { try { var factory = new ConnectionFactory() { HostName = options.Value.RabbitHost, UserName = options.Value.RabbitUserName, Password = options.Value.RabbitPassword, Port = options.Value.RabbitPort, }; this.connection = factory.CreateConnection(); this.channel = connection.CreateModel(); } catch (Exception ex) { Console.WriteLine($"RabbitListener init error,ex:{ex.Message}"); } } public Task StartAsync(CancellationToken cancellationToken) { Register(); return Task.CompletedTask; } protected string exchangeName; protected string RouteKey; protected string QueueName; // 处理消息的方法 public virtual bool Process(string message) { throw new NotImplementedException(); } // 注册消费者监听在这里 public void Register() { Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}"); //声明交换机 channel.ExchangeDeclare(exchange: exchangeName, type: "direct"); //声明队列 //var queueName = channel.QueueDeclare().QueueName; channel.QueueDeclare( queue: QueueName,//消息队列名称 durable: false,//是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息。 exclusive: false,//是否排他,true排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除. autoDelete: false,//是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除. arguments: null ////设置队列的一些其它参数 ); //将队列与交换机进行绑定 foreach (var routeKey in RouteKey) { //匹配多个路由 channel.QueueBind(queue: QueueName, exchange: exchangeName, routingKey: RouteKey); } //声明为手动确认 //channel.BasicQos(0, 1, false); //定义消费者 var consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); //.Span var message = Encoding.UTF8.GetString(body); var result = Process(message); if (result) { //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, true); } }; //启动消费者 设置为手动应答消息 channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer); } public void DeRegister() { this.connection.Close(); } public Task StopAsync(CancellationToken cancellationToken) { this.connection.Close(); return Task.CompletedTask; } }下面是继承基类后,实现业务场景的数据监听,主要就是实现路由的配置和process接受到message数据后,进行数据处理。
/// <summary> /// 消费者实现process消息处理 /// </summary> public class ChapterLister : RabbitListener { private readonly ILogger<RabbitListener> _logger; // 因为Process函数是委托回调,直接将其他Service注入的话两者不在一个scope, // 这里要调用其他的Service实例只能用IServiceProvider CreateScope后获取实例对象 private readonly IServiceProvider _services; public ChapterLister(IServiceProvider services, IOptions<AppConfiguration> options, ILogger<RabbitListener> logger) : base(options) { base.exchangeName = "exchange"; base.RouteKey = "TX"; base.QueueName = "QN" + new Random().Next(1, 1000).ToString(); _logger = logger; _services = services; } public override bool Process(string message) { var taskMessage = JToken.Parse(message); if (taskMessage == null) { // 返回false 的时候回直接驳回此消息,表示处理不了 return false; } try { using (var scope = _services.CreateScope()) { var xxxService = scope.ServiceProvider.GetRequiredService<XXXXService>(); return true; } } catch (Exception ex) { _logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}"); _logger.LogError(-1, ex, "Process fail"); return false; } } }
以上主要是接受到了rabbitmq客户端push过来的数据,已经进入到message对象中,下面就可以把数据进行转发。
我这里介绍的是通过websocket实时转发,在以上的scope代码中我们定义了一个service的处理代码,实际修改为我们的websocket对象,用wsk对象进行数据转发到前端页面上,至于如何进行socket通讯看我发的这篇文章http://www.10qianwan.com/articledetail/779497.html
整个的解决方案大概介绍就是这样的,涉及到消息队列存储,消息监听,实时转发到前端页面。在实际开发项目中,我们可能就需要即时推送数据,通过api介绍到数据之后我们进行队列存储,消费队列数据进行转发,这样做到即时,比如我们的一些股票期货这样的软件就需要即时通讯,数据即时展示在客户端,就需要很强的及时转发机制,当然这个只是一个思路介绍,希望能对大家的项目开发起到帮助!