使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?
众所周知垂直扩展是提升单机的性能的方式,比如提升双路、四路的CPU运算能力,加大内存,更换速度更快的SSD,或者从代码根本上进行优化和性能提升。水平扩展是提供多台多种服务器分离单机性能的方式,比如集群,主从,队列,负载平衡等等。
白话的垂直扩展
现在服务器都是云服务器,单纯从单机的硬件性能提升整体性能,可能已经不太适用,而从代码上,其实还有些功课可以做,即使不多:
- 优化多线程协调模式,优化多线程下资源共享问题,以免出现奇怪的运行时错误。
- 改同步为异步:此方法提升的是吞吐率,性能并不能提升,不过对于客户端响应也算是件好事吧。
- 使用磁盘预读模式,极小幅度提升IO性能。
- 使用单机任务队列,强制任务有序进行:此方法在单机上不会提升性能,甚至会减少原本的单机吞吐率,但是却能保证任务在同一时刻的完整性。
我们先从垂直扩展中压榨单机的性能,同时还要保证稳定性,甚至稳定性比单机极限性能更加重要,为何?因为多线程(web服务器都是多线程模型)资源互斥问题,会让你查找问题的时候抓狂(当然,如果你要访问的资源只是单个,就另当别论了)。因此,很多时候我们通常会加锁来避免这类事情发生(锁的问题和功能我们这里不讨论),虽然牺牲了性能,但却换来了每次高请求所带来的稳定性。
其次,我们知道,web服务器都属于多线程模型,这样设计的目的是为了提高该服务器的整体吞吐量(不同服务器语言采用不同的线程开辟模式,例如java使用的是系统级的线程),当一个线程正在接近满负荷的处理当前的任务,紧接着马上又来一个请求(系统不会因为当前正在运行任务而终止新的请求),那么将是雪上加霜的,多个任务同时长时间在抢占同一个CPU资源,无疑是对整体影响甚大的。
言归正传,我们在单机上面针对这类问题,既要尽可能的减少处理时间,又要绝对保证整体运行期间的稳定性(后期会介绍如何使用熔断机制提升多台服务器系统的整体稳定性)。
上一节,我们已经创建了一个同步的接口,下面我们将这个接口稍作改动,使其成为包含异步任务方法的接口,整体代码就不贴上来了,以免影响篇幅
- 如果你喜欢手动创建与管理任务,那么你可以new一个Task<TResult>实例。
- 如果你喜欢让系统为你管理该任务状态,那么你可以Task.Factary.StartNew来新建一个实例。
白话的水平扩展
当一条街道上的小区越来越多,用水越来越大,而住户反应水压却越来越小,你是考虑增加主管道通水量大小、还是考虑增加每个小区的增压泵的功率、还是考虑增加主管道的数量(目前笔者小区就遇到水压不够的情况)。
在软件工程项目中,其实服务器TPS跟水压是同一个概念。
- 加大主水管道(如同提升CPU、内存)始终会有一个极限;
- 增加每个小区增压泵的功率(如同客户端使用大量的轮询,可主管道出口就只有那么点点量)始终要求比得到的多得多;
因此换句话说:
- 增加服务器数量(毕竟服务器比自来水厂容易建设:-)),提升管道入口的处理能力;
- 增加不同服务器类型,例如队列服务器,负载服务器,缓存服务器等等中间服务器,分摊和分离不同功能分到而行,如同主管道的分流阀,节流阀,增压泵等等;
- 增加带宽(这个是肯定的,提升TPS带宽肯定也是主要的);
一:点对点——原装
当然,如果接口已经成为了异步模式(本质其实是提前返回请求,但并没有返回请求所处理的结果),那么还需要一个接口来告诉客户端处理的结果,客户端通过该接口的轮询获得实时的结果。
在笔者介绍的这个服务中,流程架构如下:
非常简单的点对点模式,用户请求一次,等待服务器处理响应完成后释放,所有内容均采用同步方式进行,得到结果是:
非常简单的点对点模式,用户请求一次,等待服务器处理响应完成后释放,所有内容均采用同步方式进行,得到结果是:
用户等待时间 = 服务器处理时间
如果用户上传10秒,而服务器处理需要4秒,那么这个等待对于用户来说,是极为煎熬的。
也许聪明的你会说,在客户端给个友好的提示,比如让一个“风火轮”不停的转动,当处理完成后隐藏掉。的确,这从另一个角度上看确实也行得通(比如成本因素),但我们不讨论用户的视觉和等待等感觉上的东西,只讨论从技术上如何让这个响应时间更快,能快到几乎让用户察觉不到。
二:使用并行任务——小幅提升
单机并行模式大家应该都明白,毕竟现在CPU都是多核的了,干嘛要让其他CPU闲着呢,不管是JAVA还是C#,目前主流语言都可以完美的执行并行任务(python开多进程其实也算),各种语法请自行Google,既然文章标题是Net,那么笔者就少量的复制一下C#的代码。
Parallel.Invoke(() => { },() => { });
哇撒,真的很少,就是C#中的一个并行执行的语句而已,自己需要并行执行的代码放入花括号中就行,换成流程结构图如下:
画的很搓,欢迎拍砖。
笔者采用的CPU是I7-2700K,并行任务状态确实使用了起来,但减少时间却只有1秒,很不可思议,或许是笔者的代码优化不够好吧(并行原理和理想结果为何有出入请自行Google),所以就不毛遂自荐的贴上来了:-),但是这个3秒时间我会跟他死磕到底——用户不能等。
三:分离用户请求和耗时处理——异步
当朋友们看到这里的时候,或许心里早就想到用异步的方式来实现C/S的接口请求了,对,但我们还是需要走一下流程,梳理一下思路。请继续接着看。
异步其实就是多线程,只是目前由于高级语言的发展,已经将线程的难点给隐藏掉了,在一个请求主线程中,新建一个异步线程(或任务),分离主线程的长时间处理耗时,将这块难啃的骨头交给子线程去做,自己只管轻松的执行到return,是的确很舒服哦(笔者也梦想拥有这样的码砖方式,o(∩_∩)o 哈哈),new一个线程我们不做介绍,毕竟他的管理模式是纯手动的、并且是复杂的,我们只介绍new一个任务来分离主线程之间的关联。
正如之前提到,微软巴巴已经将这种模型给封装好了,只需要在接口处理函数内、将处理模块塞进Task中即可,不用再去new一个线程、管理这个线程的状态、什么时候调度、什么时候阻塞等等一些较底层的操作。万事有好必有坏,多线程模型创建是很简单了,相应的实现细节对于很多入门的朋友就看不懂了。
不过,当一个对外接口(或者内部函数)采用异步模式,那么调用端也需要进行轮询(异步同步无所谓,看调用如何实现)处理结果,这个模式相比原来常规同步复杂许多,需要建立任务、执行任务、存储任务状态和结果等等,不废话,上图:
通过将“服务器处理耗时”进行分离,请求主线程只需要将相应的参数传递给子线程(或任务),主线程就直接返回到客户端,如果忽略子任务之前的逻辑时间复杂度,完全可以达到瞬间返回到客户端,具体时长根据不同的平台和架构不同而不同,正如之前国外有人对NET CORE和GO进行过空业务响应对比(具体链接得找找),在请求数高于100W(包括并发)和没有任何逻辑代码的前提下,直接请求某个接口,NET CORE只比GO慢了近40ms(相同单机)。这样的性能还是非常看好的。
另外,如果处理时间过长,而且子任务不能及时返回,那将产生越来越多的任务阻塞,毕竟一个CPU是有极限的,并且伴随着或多或少的运行时错误,而这种错误是最让我们程序员头疼的,因此,这时我们需要加入单机队列,来限制和防止处理和请求达到瞬时波峰,文章结尾提供一份单机队列的代码供大家参考:
实际证明,这样对于用户来说,是瞬间的,不用等待的,极大的提升了用户体验。不过呢,如果请求数越多,那么越后进来的请求,等待的时间将越长,对于客户端轮询的时间也将变得更长。
轮询时间 = 请求数(单机队列数) * 单个处理耗时时间
好像比原来的点对点更糟糕了,实际我们根据这个架构进行扩展,将得到更好的体验,请继续接着看。
四:让多台机器一起工作吧——集群
先看张图:
哇塞,一下子变得这么复杂,好捉急啊。其实并不难理解,我们来看一看做了哪些变化:
- 单机的队列扩展为了使用服务器做队列集群;
- 增加调度任务;
- 将多个处理服务分配到多台机器上运行;
- 单机缓存增加到缓存集群;
其他也就没什么花头了。当请求任务过高,放入队列中,分离前级请求和后级处理,后级处理服务器的数量将直接影响整个平台的异步处理时间。如果非要对比单机模式,性能是随处理服务器的数量增加而提高的。下一节我们将详细讨论这套架构方案。
感谢阅读!
(附上单机队列的实现,仅供参考)
1 /// <summary> 2 /// 异步任务队列 3 /// </summary> 4 public class AsyncTaskQueue : IDisposable 5 { 6 private bool _isDisposed; 7 private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>(); 8 private Thread _thread; 9 private AutoResetEvent _autoResetEvent; 10 11 /// <summary> 12 /// 异步任务队列 13 /// </summary> 14 public AsyncTaskQueue() 15 { 16 _autoResetEvent = new AutoResetEvent(false); 17 _thread = new Thread(InternalRuning) {IsBackground = true}; 18 _thread.Start(); 19 } 20 21 private bool TryGetNextTask(out AwaitableTask task) 22 { 23 task = null; 24 while (_queue.Count > 0) 25 { 26 if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) return true; 27 task.Cancel(); 28 } 29 30 return false; 31 } 32 33 private AwaitableTask PenddingTask(AwaitableTask task, int maxQueueCount = 1000) 34 { 35 lock (_queue) 36 { 37 if (_queue.Count >= maxQueueCount) 38 { 39 throw new Exception($"超出最大队列数量,maxQueueCount={maxQueueCount}"); 40 } 41 42 Debug.Assert(task != null); 43 _queue.Enqueue(task); 44 _autoResetEvent.Set(); 45 } 46 47 return task; 48 } 49 50 private void InternalRuning() 51 { 52 while (!_isDisposed) 53 { 54 if (_queue.Count == 0) 55 { 56 _autoResetEvent.WaitOne(); 57 } 58 59 while (TryGetNextTask(out var task)) 60 { 61 if (task.IsCancel) continue; 62 63 if (UseSingleThread) 64 { 65 task.RunSynchronously(); 66 } 67 else 68 { 69 task.Start(); 70 } 71 } 72 } 73 } 74 75 /// <summary> 76 /// 是否使用单线程完成任务. 77 /// </summary> 78 public bool UseSingleThread { get; set; } = true; 79 80 /// <summary> 81 /// 自动取消以前的任务。 82 /// </summary> 83 public bool AutoCancelPreviousTask { get; set; } = false; 84 85 /// <summary> 86 /// 执行任务 87 /// </summary> 88 /// <param name="action"></param> 89 /// <param name="maxQueueCount"></param> 90 /// <returns></returns> 91 public AwaitableTask Run(Action action, int maxQueueCount = 1000) 92 => PenddingTask(new AwaitableTask(new Task(action, new CancellationToken(false))), maxQueueCount); 93 94 /// <summary> 95 /// 执行任务 96 /// </summary> 97 /// <typeparam name="TResult"></typeparam> 98 /// <param name="function"></param> 99 /// <param name="maxQueueCount"></param> 100 /// <returns></returns> 101 public AwaitableTask<TResult> Run<TResult>(Func<TResult> function, int maxQueueCount = 1000) 102 => (AwaitableTask<TResult>) PenddingTask(new AwaitableTask<TResult>(new Task<TResult>(function)), 103 maxQueueCount); 104 105 106 /// <inheritdoc /> 107 public void Dispose() 108 { 109 Dispose(true); 110 GC.SuppressFinalize(this); 111 } 112 113 /// <summary> 114 /// 析构任务队列 115 /// </summary> 116 ~AsyncTaskQueue() => Dispose(false); 117 118 private void Dispose(bool disposing) 119 { 120 if (_isDisposed) return; 121 if (disposing) 122 { 123 _autoResetEvent.Dispose(); 124 } 125 126 _thread = null; 127 _autoResetEvent = null; 128 _isDisposed = true; 129 } 130 131 /// <summary> 132 /// 可等待的任务 133 /// </summary> 134 public class AwaitableTask 135 { 136 private readonly Task _task; 137 138 /// <summary> 139 /// 初始化可等待的任务。 140 /// </summary> 141 /// <param name="task"></param> 142 public AwaitableTask(Task task) => _task = task; 143 144 /// <summary> 145 /// 任务的Id 146 /// </summary> 147 public int TaskId => _task.Id; 148 149 /// <summary> 150 /// 任务是否取消 151 /// </summary> 152 public bool IsCancel { get; private set; } 153 154 /// <summary> 155 /// 开始任务 156 /// </summary> 157 public void Start() => _task.Start(); 158 159 /// <summary> 160 /// 同步执行开始任务 161 /// </summary> 162 public void RunSynchronously() => _task.RunSynchronously(); 163 164 /// <summary> 165 /// 取消任务 166 /// </summary> 167 public void Cancel() => IsCancel = true; 168 169 /// <summary> 170 /// 获取任务等待器 171 /// </summary> 172 /// <returns></returns> 173 public TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 174 175 /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary> 176 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 177 public struct TaskAwaiter : INotifyCompletion 178 { 179 private readonly AwaitableTask _task; 180 181 /// <summary> 182 /// 任务等待器 183 /// </summary> 184 /// <param name="awaitableTask"></param> 185 public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask; 186 187 /// <summary> 188 /// 任务是否完成. 189 /// </summary> 190 public bool IsCompleted => _task._task.IsCompleted; 191 192 /// <inheritdoc /> 193 public void OnCompleted(Action continuation) 194 { 195 var This = this; 196 _task._task.ContinueWith(t => 197 { 198 if (!This._task.IsCancel) continuation?.Invoke(); 199 }); 200 } 201 202 /// <summary> 203 /// 获取任务结果 204 /// </summary> 205 public void GetResult() => _task._task.Wait(); 206 } 207 } 208 209 /// <summary> 210 /// 可等待的任务 211 /// </summary> 212 /// <typeparam name="TResult"></typeparam> 213 public class AwaitableTask<TResult> : AwaitableTask 214 { 215 /// <summary> 216 /// 初始化可等待的任务 217 /// </summary> 218 /// <param name="task">需要执行的任务</param> 219 public AwaitableTask(Task<TResult> task) : base(task) => _task = task; 220 221 222 private readonly Task<TResult> _task; 223 224 /// <summary> 225 /// 获取任务等待器 226 /// </summary> 227 /// <returns></returns> 228 public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 229 230 /// <summary> 231 /// 任务等待器 232 /// </summary> 233 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 234 public new struct TaskAwaiter : INotifyCompletion 235 { 236 private readonly AwaitableTask<TResult> _task; 237 238 /// <summary> 239 /// 初始化任务等待器 240 /// </summary> 241 /// <param name="awaitableTask"></param> 242 public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask; 243 244 /// <summary> 245 /// 任务是否已完成 246 /// </summary> 247 public bool IsCompleted => _task._task.IsCompleted; 248 249 /// <inheritdoc /> 250 public void OnCompleted(Action continuation) 251 { 252 var This = this; 253 _task._task.ContinueWith(t => 254 { 255 if (!This._task.IsCancel) continuation?.Invoke(); 256 }); 257 } 258 259 /// <summary> 260 /// 获取任务结果 261 /// </summary> 262 /// <returns></returns> 263 public TResult GetResult() => _task._task.Result; 264 } 265 } 266 }