欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性

程序员文章站 2022-12-29 12:31:57
消息队列 神马是消息队列,看看某度的原话“在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量”。 其实消息队列还可以用于解耦,在多层项目模型或中型项目以上,都会用到消息队列,减少层与层之间的耦合;还可以做跨进 ......

 使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性

消息队列

神马是消息队列,看看某度的原话“在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量”。

其实消息队列还可以用于解耦,在多层项目模型或中型项目以上,都会用到消息队列,减少层与层之间的耦合;还可以做跨进程间的通讯(传输率显然比不上rpc)。

上一节说道最终需要采用消息队列来进行分离前级和后级,并且采用异步方式,用于提高业务服务器的吞吐率,不过,虽然分离了,如果后级服务器的处理能力达不到请求数或接*衡,那么分离也是无用的,甚至会影响整个系统的执行效率。比如这样

1台业务逻辑服务器 => 生产消息 => 消息服务器 => 消费者(处理)

其实就等同于:

1台业务逻辑服务器 => 消费者(处理)

或者换一种场景:

一个银行有多个窗口,但目前只打开了一个窗口进行服务,我们假设这个窗口的服务人员是每半小时完成一个用户,如果有10个用户,那么就是10*30=300分钟,最后一个用户需要排队对待270分钟后才轮到他到窗口,这是多么荒唐的事情(很多服务行业的通病),用户肯定会非常的不耐烦。如果我们再增开3个闲置的窗口,并且配上相应的服务人员,一次接待4位客人,那么这个时间将会缩短3倍,变成只需要90分钟即可轮到他。

在这个场景中,增设窗口就属于水平扩展,而不是督促服务人员提高工作效率、这种垂直扩展来提高整体效率(毕竟不管是机器还是人,都有极限)。服务器消息队列中的消费者也是如此,并且相同类型(或处理逻辑相同)的扩展完全属于傻瓜化的,可比增设窗口简单多了。

 在来看一下上一节中的最后一张图片:

使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性

“ffmpeg服务器...n”就属于傻瓜式的水平扩展,想想一下:同一份代码,部署到不同的服务器上面,是不是特别的轻松。

 

使用rabbitmq进行水平扩展

rabbitmq的安装这里就不介绍了,先搞清楚他是一个amqp标准即可,由于我们这个项目只涉及到一个处理逻辑——音频处理,而不讨论与其他项目相关,所以我们将交换机exchange,队列queue,路由关键字routing key均设为直连一根线通到底,无需中间做任何交换,当然也不需要交换机进行广播fanout,完全的direct即可。

去重(重复消费)的问题:

ribbitmq利用ack机制来确定消息的可靠性,但是需要消费端完全完成这条消息后才会做出应答,这样便会造成消费不等,即一个还在处理消费,而另一也紧跟着处理这个消费。一般出在任务超时,或者没有及时返回状态,引起任务重新入队列,重新消费,在rabbtimq里连接的断开也会触发消息重新入队列,解决方案有很多,也可以参考幂等性方法。

将一条消息做一个唯一的标签,例如guid,每次在处理前先判断这个标签的状态是否被处理,如果已被处理,该消费端就放弃这条消息。

废话不多,开始:

 

建立任务并发送消息

首先我们需要创建一个任务,这个任务可以是个标识,也可以是一个存储,但任务名称必须是唯一(id)的,用随机字符串生成一组唯一id,笔者提供一个方法,供大家参考:

使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性
 1 ///<summary>
 2         ///生成随机字符串 
 3         ///</summary>
 4         ///<param name="length">目标字符串的长度</param>
 5         ///<param name="usenum">是否包含数字,1=包含,默认为包含</param>
 6         ///<param name="uselow">是否包含小写字母,1=包含,默认为包含</param>
 7         ///<param name="useupp">是否包含大写字母,1=包含,默认为包含</param>
 8         ///<param name="usespe">是否包含特殊字符,1=包含,默认为不包含</param>
 9         ///<param name="custom">要包含的自定义字符,直接输入要包含的字符列表</param>
10         ///<returns>指定长度的随机字符串</returns>
11         public static string getrandomstring(int length, bool usenum, bool uselow, bool useupp, bool usespe,
12             string custom)
13         {
14             byte[] b = new byte[4];
15             new system.security.cryptography.rngcryptoserviceprovider().getbytes(b);
16             random r = new random(bitconverter.toint32(b, 0));
17             string s = null, str = custom;
18             if (usenum == true)
19             {
20                 str += "0123456789";
21             }
22 
23             if (uselow == true)
24             {
25                 str += "abcdefghijklmnopqrstuvwxyz";
26             }
27 
28             if (useupp == true)
29             {
30                 str += "abcdefghijklmnopqrstuvwxyz";
31             }
32 
33             if (usespe == true)
34             {
35                 str += "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~";
36             }
37 
38             for (int i = 0; i < length; i++)
39             {
40                 s += str.substring(r.next(0, str.length - 1), 1);
41             }
42 
43             return s;
44         }
view code

 

再建立一个接口,用于接受来自客户端的请求,根据请求异步创建一个任务,并将任务名称返回到请求客户端。

使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性
1             var taskname = audioparamfactory.getrandomstring(8, true, true, true, false, null);
2 
3             _imsgbusservice.pubilsh(jsonconvert.serializeobject(new
4             {
5                 frontfileurl,
6                 backgounedaudioindex,
7                 taskname
8             }), dispatchendpoint.media);
view code

 

上述代码中直接就两句话,一:建立一个任务名称;二:将消息发送到名为“media”的队列中。

为何创建连接,创建通道,配置等等都没有呢,这是因为在easyhub的框架中已经做好了,偷会懒吧o(∩_∩)o 。

通过请求8次,那么media队列中将存在8条消息,如图所示:

使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性

 

存储消费者处理后的状态

当消费完成,处理应答是必须的,否则这条消息会永远的存在消息服务器中。

使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性
 1 public void dostart()
 2         {
 3             // 1:从消息队列中取得需要处理的音频消息
 4             consumer consumer = new consumer(mqconfig.meidaqueuename);
 5             var channel = consumer.channel;
 6             consumer.receivedevent += (sender, args) =>
 7             {
 8                 var msg = encoding.utf8.getstring(args.body);
 9                 console.writeline(args.routingkey + "\r\n" + msg);
10                 console.writeline();
11 
12                 // 2:执行同步处理(一次只调用一个同步处理单元)
13                 var nonobj = jsonconvert.deserializeobject<dictionary<string, object>>(msg);
14                 var nonboy = jsonconvert.deserializeobject<dynamic>(nonobj["body"].tostring());
15                 string forntfileurl = nonboy.frontfileurl;
16                 int backgounedaudioindex = nonboy.backgounedaudioindex;
17                 string taskname = nonboy.taskname;
18                 // 调用同步方法
19                 var r = synthesisaudio(forntfileurl, backgounedaudioindex, taskname);
20                 console.writeline(r.gettype());
21                 console.writeline(typeof(audiosynthesissyncresult));
22                 if (r.gettype() == typeof(audiosynthesissyncresult))
23                 {
24                     // 3:处理完成,应答队列服务器
25                     channel.basicack(args.deliverytag, false);
26                     console.writeline(taskname);
27                     console.writeline("handler done, wait for the next message...");
28                 }
29                 else 
30                 {
31                     // 出现处理错误,则该条消息不做应答,并发送错误
32                     var error = ((jsonresult) r);
33                     console.writeline(error.statuscode);
34                     console.writeline(error.value);
35                 }
36             };
37         }
view code

当任务进入到消息队列,其实就和当时的请求是没有任何联系的了,这样来理解异步也不错,所以我们需要将任务的状态进行分类存储,以告诉客户端在查询的时候,当前的任务进行到哪一步了,我们可以用枚举的方式来罗列:

 1     public enum audioprocessingstate
 2     {
 3         emptyhandler = 0,
 4         starthandler = 1,
 5         downloadaudio = 2,
 6         synthesisaudio = 3,
 7         uploadaudio = 4,
 8         updatedatabase = 5,
 9         handlerexception = 6,
10         incompleted = 7
11     }

 

 笔者提供的任务状态有8种,具体时候请根据自己的业务逻辑进行区分,很简单,就是前面画的那张垂直流程图,不解释。

当然,如果你把所有任务状态都存到数据库,那么将会有个问题,这数据库面对轮询的压力有点吃力,所以最好还是放到缓存中,至于喜欢放什么缓存,这个根据业务场景和现有的而定,千万别放本地缓存就行。

对了,状态放缓存,而结果需要放数据库,这是原则问题。

 

客户端轮询结果接口

接下来我们在创建一个提供查询的接口,这里实际就是查询缓存而已,如果状态是incompleted,就直接从数据库取结果,因为非常的简单,笔者就不放代码上来了。

不过有朋友喜欢将结果进行推送到客户端,这也是非常好的,而且相比轮询,推送更能减少服务器压力。

 

测试结果

为了验证结果,笔者前前后后进行了多次的测试,在i7-2700k的win10上面模拟了多台服务器,看看这截图:

使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性

能分离的全都分离,包括请求和查询也单列一台服务器。

使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性

经过测试,笔者通过模拟请求8个任务,采用逐级增加服务的方式,得到了如下的结果:

单机  最快(最早入队)/ms
 最慢(最晚入队)/ms
第一次  3241  19430
第二次  3271  19592
第三次  4564  19227
两台    
第一次  4058  9819
第二次  3146  9014
第三次  4033  8798
三台    
第一次  3880  9830
第二次  3477  7700
第三次  3182  6993
六台    
第一次  3709  4800
第二次  3313  4773
第三次  3182  4793

最早入队的任务时间基本锁定在3-4s,为何会有这么大的波动,毕竟笔者的电脑不是真正的服务器电脑。而反观最晚入队的任务,在单机模式上,达到了19s,随着逐级的增加服务(笔者电脑开6个已经吃不消了),达到了不到5s,整体时间缩短了近4倍,结果非常令人满意。

下一节将介绍在netcore中如何使用中间件自动启动任务调度,而不是采用quartz中间件。

 

感谢阅读