欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

程序员文章站 2022-03-17 09:51:25
众所周知垂直扩展是提升单机的性能的方式,比如提升双路、四路的CPU运算能力,加大内存,更换速度更快的SSD,或者从代码根本上进行优化和性能提升。水平扩展是提供多台多种服务器分离单机性能的方式,比如集群,主从,队列,负载平衡等等。 白话的垂直扩展 现在服务器都是云服务器,单纯从单机的硬件性能提升整体性 ......

使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

众所周知垂直扩展是提升单机的性能的方式,比如提升双路、四路的CPU运算能力,加大内存,更换速度更快的SSD,或者从代码根本上进行优化和性能提升。水平扩展是提供多台多种服务器分离单机性能的方式,比如集群,主从,队列,负载平衡等等。

 

 白话的垂直扩展

现在服务器都是云服务器,单纯从单机的硬件性能提升整体性能,可能已经不太适用,而从代码上,其实还有些功课可以做,即使不多:

  1. 优化多线程协调模式,优化多线程下资源共享问题,以免出现奇怪的运行时错误。
  2. 改同步为异步:此方法提升的是吞吐率,性能并不能提升,不过对于客户端响应也算是件好事吧。
  3. 使用磁盘预读模式,极小幅度提升IO性能。
  4. 使用单机任务队列,强制任务有序进行:此方法在单机上不会提升性能,甚至会减少原本的单机吞吐率,但是却能保证任务在同一时刻的完整性。

我们先从垂直扩展中压榨单机的性能,同时还要保证稳定性,甚至稳定性比单机极限性能更加重要,为何?因为多线程(web服务器都是多线程模型)资源互斥问题,会让你查找问题的时候抓狂(当然,如果你要访问的资源只是单个,就另当别论了)。因此,很多时候我们通常会加锁来避免这类事情发生(锁的问题和功能我们这里不讨论),虽然牺牲了性能,但却换来了每次高请求所带来的稳定性。

其次,我们知道,web服务器都属于多线程模型,这样设计的目的是为了提高该服务器的整体吞吐量(不同服务器语言采用不同的线程开辟模式,例如java使用的是系统级的线程),当一个线程正在接近满负荷的处理当前的任务,紧接着马上又来一个请求(系统不会因为当前正在运行任务而终止新的请求),那么将是雪上加霜的,多个任务同时长时间在抢占同一个CPU资源,无疑是对整体影响甚大的。

言归正传,我们在单机上面针对这类问题,既要尽可能的减少处理时间,又要绝对保证整体运行期间的稳定性(后期会介绍如何使用熔断机制提升多台服务器系统的整体稳定性)。

上一节,我们已经创建了一个同步的接口,下面我们将这个接口稍作改动,使其成为包含异步任务方法的接口,整体代码就不贴上来了,以免影响篇幅

  1. 如果你喜欢手动创建与管理任务,那么你可以new一个Task<TResult>实例。
  2. 如果你喜欢让系统为你管理该任务状态,那么你可以Task.Factary.StartNew来新建一个实例。

 

 白话的水平扩展

当一条街道上的小区越来越多,用水越来越大,而住户反应水压却越来越小,你是考虑增加主管道通水量大小、还是考虑增加每个小区的增压泵的功率、还是考虑增加主管道的数量(目前笔者小区就遇到水压不够的情况)。

在软件工程项目中,其实服务器TPS跟水压是同一个概念。

  1. 加大主水管道(如同提升CPU、内存)始终会有一个极限;
  2. 增加每个小区增压泵的功率(如同客户端使用大量的轮询,可主管道出口就只有那么点点量)始终要求比得到的多得多;

因此换句话说:

  1. 增加服务器数量(毕竟服务器比自来水厂容易建设:-)),提升管道入口的处理能力;
  2. 增加不同服务器类型,例如队列服务器,负载服务器,缓存服务器等等中间服务器,分摊和分离不同功能分到而行,如同主管道的分流阀,节流阀,增压泵等等;
  3. 增加带宽(这个是肯定的,提升TPS带宽肯定也是主要的);

 

一:点对点——原装

当然,如果接口已经成为了异步模式(本质其实是提前返回请求,但并没有返回请求所处理的结果),那么还需要一个接口来告诉客户端处理的结果,客户端通过该接口的轮询获得实时的结果。

在笔者介绍的这个服务中,流程架构如下:

  非常简单的点对点模式,用户请求一次,等待服务器处理响应完成后释放,所有内容均采用同步方式进行,得到结果是:使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

 非常简单的点对点模式,用户请求一次,等待服务器处理响应完成后释放,所有内容均采用同步方式进行,得到结果是:

用户等待时间 = 服务器处理时间

如果用户上传10秒,而服务器处理需要4秒,那么这个等待对于用户来说,是极为煎熬的。

也许聪明的你会说,在客户端给个友好的提示,比如让一个“风火轮”不停的转动,当处理完成后隐藏掉。的确,这从另一个角度上看确实也行得通(比如成本因素),但我们不讨论用户的视觉和等待等感觉上的东西,只讨论从技术上如何让这个响应时间更快,能快到几乎让用户察觉不到。

 

二:使用并行任务——小幅提升

单机并行模式大家应该都明白,毕竟现在CPU都是多核的了,干嘛要让其他CPU闲着呢,不管是JAVA还是C#,目前主流语言都可以完美的执行并行任务(python开多进程其实也算),各种语法请自行Google,既然文章标题是Net,那么笔者就少量的复制一下C#的代码。

Parallel.Invoke(() => { },() => { });

 

哇撒,真的很少,就是C#中的一个并行执行的语句而已,自己需要并行执行的代码放入花括号中就行,换成流程结构图如下:

使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

画的很搓,欢迎拍砖。

笔者采用的CPU是I7-2700K,并行任务状态确实使用了起来,但减少时间却只有1秒,很不可思议,或许是笔者的代码优化不够好吧(并行原理和理想结果为何有出入请自行Google),所以就不毛遂自荐的贴上来了:-),但是这个3秒时间我会跟他死磕到底——用户不能等。

 

三:分离用户请求和耗时处理——异步

 当朋友们看到这里的时候,或许心里早就想到用异步的方式来实现C/S的接口请求了,对,但我们还是需要走一下流程,梳理一下思路。请继续接着看。

异步其实就是多线程,只是目前由于高级语言的发展,已经将线程的难点给隐藏掉了,在一个请求主线程中,新建一个异步线程(或任务),分离主线程的长时间处理耗时,将这块难啃的骨头交给子线程去做,自己只管轻松的执行到return,是的确很舒服哦(笔者也梦想拥有这样的码砖方式,o(∩_∩)o 哈哈),new一个线程我们不做介绍,毕竟他的管理模式是纯手动的、并且是复杂的,我们只介绍new一个任务来分离主线程之间的关联。

正如之前提到,微软巴巴已经将这种模型给封装好了,只需要在接口处理函数内、将处理模块塞进Task中即可,不用再去new一个线程、管理这个线程的状态、什么时候调度、什么时候阻塞等等一些较底层的操作。万事有好必有坏,多线程模型创建是很简单了,相应的实现细节对于很多入门的朋友就看不懂了。

不过,当一个对外接口(或者内部函数)采用异步模式,那么调用端也需要进行轮询(异步同步无所谓,看调用如何实现)处理结果,这个模式相比原来常规同步复杂许多,需要建立任务、执行任务、存储任务状态和结果等等,不废话,上图:

使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

通过将“服务器处理耗时”进行分离,请求主线程只需要将相应的参数传递给子线程(或任务),主线程就直接返回到客户端,如果忽略子任务之前的逻辑时间复杂度,完全可以达到瞬间返回到客户端,具体时长根据不同的平台和架构不同而不同,正如之前国外有人对NET CORE和GO进行过空业务响应对比(具体链接得找找),在请求数高于100W(包括并发)和没有任何逻辑代码的前提下,直接请求某个接口,NET CORE只比GO慢了近40ms(相同单机)。这样的性能还是非常看好的。

另外,如果处理时间过长,而且子任务不能及时返回,那将产生越来越多的任务阻塞,毕竟一个CPU是有极限的,并且伴随着或多或少的运行时错误,而这种错误是最让我们程序员头疼的,因此,这时我们需要加入单机队列,来限制和防止处理和请求达到瞬时波峰,文章结尾提供一份单机队列的代码供大家参考:

实际证明,这样对于用户来说,是瞬间的,不用等待的,极大的提升了用户体验。不过呢,如果请求数越多,那么越后进来的请求,等待的时间将越长,对于客户端轮询的时间也将变得更长。

轮询时间 = 请求数(单机队列数) * 单个处理耗时时间

好像比原来的点对点更糟糕了,实际我们根据这个架构进行扩展,将得到更好的体验,请继续接着看。

 

四:让多台机器一起工作吧——集群

先看张图:

使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

哇塞,一下子变得这么复杂,好捉急啊。其实并不难理解,我们来看一看做了哪些变化:

  1. 单机的队列扩展为了使用服务器做队列集群;
  2. 增加调度任务;
  3. 将多个处理服务分配到多台机器上运行;
  4. 单机缓存增加到缓存集群;

其他也就没什么花头了。当请求任务过高,放入队列中,分离前级请求和后级处理,后级处理服务器的数量将直接影响整个平台的异步处理时间。如果非要对比单机模式,性能是随处理服务器的数量增加而提高的。下一节我们将详细讨论这套架构方案。

 

感谢阅读!

 

(附上单机队列的实现,仅供参考)

使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?
  1 /// <summary>
  2 /// 异步任务队列
  3 /// </summary>
  4 public class AsyncTaskQueue : IDisposable
  5 {
  6     private bool _isDisposed;
  7     private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>();
  8     private Thread _thread;
  9     private AutoResetEvent _autoResetEvent;
 10 
 11     /// <summary>
 12     /// 异步任务队列
 13     /// </summary>
 14     public AsyncTaskQueue()
 15     {
 16         _autoResetEvent = new AutoResetEvent(false);
 17         _thread = new Thread(InternalRuning) {IsBackground = true};
 18         _thread.Start();
 19     }
 20 
 21     private bool TryGetNextTask(out AwaitableTask task)
 22     {
 23         task = null;
 24         while (_queue.Count > 0)
 25         {
 26             if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) return true;
 27             task.Cancel();
 28         }
 29 
 30         return false;
 31     }
 32 
 33     private AwaitableTask PenddingTask(AwaitableTask task, int maxQueueCount = 1000)
 34     {
 35         lock (_queue)
 36         {
 37             if (_queue.Count >= maxQueueCount)
 38             {
 39                 throw new Exception($"超出最大队列数量,maxQueueCount={maxQueueCount}");
 40             }
 41 
 42             Debug.Assert(task != null);
 43             _queue.Enqueue(task);
 44             _autoResetEvent.Set();
 45         }
 46 
 47         return task;
 48     }
 49 
 50     private void InternalRuning()
 51     {
 52         while (!_isDisposed)
 53         {
 54             if (_queue.Count == 0)
 55             {
 56                 _autoResetEvent.WaitOne();
 57             }
 58 
 59             while (TryGetNextTask(out var task))
 60             {
 61                 if (task.IsCancel) continue;
 62 
 63                 if (UseSingleThread)
 64                 {
 65                     task.RunSynchronously();
 66                 }
 67                 else
 68                 {
 69                     task.Start();
 70                 }
 71             }
 72         }
 73     }
 74 
 75     /// <summary>
 76     /// 是否使用单线程完成任务.
 77     /// </summary>
 78     public bool UseSingleThread { get; set; } = true;
 79 
 80     /// <summary>
 81     /// 自动取消以前的任务。
 82     /// </summary>
 83     public bool AutoCancelPreviousTask { get; set; } = false;
 84 
 85     /// <summary>
 86     /// 执行任务
 87     /// </summary>
 88     /// <param name="action"></param>
 89     /// <param name="maxQueueCount"></param>
 90     /// <returns></returns>
 91     public AwaitableTask Run(Action action, int maxQueueCount = 1000)
 92         => PenddingTask(new AwaitableTask(new Task(action, new CancellationToken(false))), maxQueueCount);
 93 
 94     /// <summary>
 95     /// 执行任务
 96     /// </summary>
 97     /// <typeparam name="TResult"></typeparam>
 98     /// <param name="function"></param>
 99     /// <param name="maxQueueCount"></param>
100     /// <returns></returns>
101     public AwaitableTask<TResult> Run<TResult>(Func<TResult> function, int maxQueueCount = 1000)
102         => (AwaitableTask<TResult>) PenddingTask(new AwaitableTask<TResult>(new Task<TResult>(function)),
103             maxQueueCount);
104 
105 
106     /// <inheritdoc />
107     public void Dispose()
108     {
109         Dispose(true);
110         GC.SuppressFinalize(this);
111     }
112 
113     /// <summary>
114     /// 析构任务队列
115     /// </summary>
116     ~AsyncTaskQueue() => Dispose(false);
117 
118     private void Dispose(bool disposing)
119     {
120         if (_isDisposed) return;
121         if (disposing)
122         {
123             _autoResetEvent.Dispose();
124         }
125 
126         _thread = null;
127         _autoResetEvent = null;
128         _isDisposed = true;
129     }
130 
131     /// <summary>
132     /// 可等待的任务
133     /// </summary>
134     public class AwaitableTask
135     {
136         private readonly Task _task;
137 
138         /// <summary>
139         /// 初始化可等待的任务。
140         /// </summary>
141         /// <param name="task"></param>
142         public AwaitableTask(Task task) => _task = task;
143 
144         /// <summary>
145         /// 任务的Id
146         /// </summary>
147         public int TaskId => _task.Id;
148 
149         /// <summary>
150         /// 任务是否取消
151         /// </summary>
152         public bool IsCancel { get; private set; }
153 
154         /// <summary>
155         /// 开始任务
156         /// </summary>
157         public void Start() => _task.Start();
158 
159         /// <summary>
160         /// 同步执行开始任务
161         /// </summary>
162         public void RunSynchronously() => _task.RunSynchronously();
163 
164         /// <summary>
165         /// 取消任务
166         /// </summary>
167         public void Cancel() => IsCancel = true;
168 
169         /// <summary>
170         /// 获取任务等待器
171         /// </summary>
172         /// <returns></returns>
173         public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
174 
175         /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary>
176         [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
177         public struct TaskAwaiter : INotifyCompletion
178         {
179             private readonly AwaitableTask _task;
180 
181             /// <summary>
182             /// 任务等待器
183             /// </summary>
184             /// <param name="awaitableTask"></param>
185             public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;
186 
187             /// <summary>
188             /// 任务是否完成.
189             /// </summary>
190             public bool IsCompleted => _task._task.IsCompleted;
191 
192             /// <inheritdoc />
193             public void OnCompleted(Action continuation)
194             {
195                 var This = this;
196                 _task._task.ContinueWith(t =>
197                 {
198                     if (!This._task.IsCancel) continuation?.Invoke();
199                 });
200             }
201 
202             /// <summary>
203             /// 获取任务结果
204             /// </summary>
205             public void GetResult() => _task._task.Wait();
206         }
207     }
208 
209     /// <summary>
210     /// 可等待的任务
211     /// </summary>
212     /// <typeparam name="TResult"></typeparam>
213     public class AwaitableTask<TResult> : AwaitableTask
214     {
215         /// <summary>
216         /// 初始化可等待的任务
217         /// </summary>
218         /// <param name="task">需要执行的任务</param>
219         public AwaitableTask(Task<TResult> task) : base(task) => _task = task;
220 
221 
222         private readonly Task<TResult> _task;
223 
224         /// <summary>
225         /// 获取任务等待器
226         /// </summary>
227         /// <returns></returns>
228         public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
229 
230         /// <summary>
231         /// 任务等待器
232         /// </summary>
233         [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
234         public new struct TaskAwaiter : INotifyCompletion
235         {
236             private readonly AwaitableTask<TResult> _task;
237 
238             /// <summary>
239             /// 初始化任务等待器
240             /// </summary>
241             /// <param name="awaitableTask"></param>
242             public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;
243 
244             /// <summary>
245             /// 任务是否已完成
246             /// </summary>
247             public bool IsCompleted => _task._task.IsCompleted;
248 
249             /// <inheritdoc />
250             public void OnCompleted(Action continuation)
251             {
252                 var This = this;
253                 _task._task.ContinueWith(t =>
254                 {
255                     if (!This._task.IsCancel) continuation?.Invoke();
256                 });
257             }
258 
259             /// <summary>
260             /// 获取任务结果
261             /// </summary>
262             /// <returns></returns>
263             public TResult GetResult() => _task._task.Result;
264         }
265     }
266 }
View Code