ElasticSearch 线程池类型分析之SizeBlockingQueue
elasticsearch 线程池类型分析之sizeblockingqueue
尽管前面写好几篇es线程池分析的文章(见文末参考链接),但都不太满意。但从es的线程池中了解到了不少java线程池的使用技巧,于是忍不住再写一篇(es6.3.2版本的源码)。文中给出的每个代码片断,都标明了这些代码是来自哪个类的哪个方法。
elasticsearch里面一共有四种类型的线程池,源码:threadpool.threadpooltype
direct("direct"), fixed("fixed"), fixed_auto_queue_size("fixed_auto_queue_size"), scaling("scaling");
get、search、write、index、flush...等各种操作是交由这些线程池实现的。为什么定义不同类型的线程池呢?举个最简单的例子:程序里面有io密集型任务,也有cpu密集型任务,这些任务都提交到一个线程池中执行?还是根据任务的执行特点将cpu密集型的任务都提交到一个线程池,io密集型任务都提交到另一个线程池执行?
不同种类的操作(index、search...)交由不同类型的线程池执行是有很多好处的:
- 互不影响:index操作频繁时,并不会影响search操作的执行。
- 资源合理利用(提升性能):如果只有一个线程池来处理所有的操作,线程池队列长度配置为多大合适?线程的数量配置多少合适?这些操作难道都要共用一个拒绝策略吗?线程执行过程中出现异常了,针对不同类型的操作,异常处理方案也是不一样的,显然:只有一个线程池(或者说只有一种类型的线程池),是无法满足这些需求的。
再来说一下es中的线程池都是如何创建的?
es节点启动时,执行node类的构造方法 :org.elasticsearch.node.node.node(org.elasticsearch.env.environment, java.util.collection<java.lang.class<? extends org.elasticsearch.plugins.plugin>>)final threadpool threadpool = new threadpool(settings, executorbuilders.toarray(new executorbuilder[0]));
new threadpool对象,从这里开始创建线程池。看懂了threadpool类,就理解了es线程池的一半。
每个操作都有一个线程池,每个线程池都有一个相应的 executorbuilder 对象,线程池都是通过executorbuilder类的build()方法创建的。
在org.elasticsearch.threadpool.threadpool.threadpool的构建函数里面创建各种executorbuilder对象。可以看出:index操作的线程池的 executorbuilder对象实际类型是fixedexecutorbuilder
builders.put(names.generic, new scalingexecutorbuilder(names.generic, 4, genericthreadpoolmax, timevalue.timevalueseconds(30))); builders.put(names.index, new fixedexecutorbuilder(settings, names.index, availableprocessors, 200, true)); builders.put(names.write, new fixedexecutorbuilder(settings, names.write, "bulk", availableprocessors, 200)); builders.put(names.get, new fixedexecutorbuilder(settings, names.get, availableprocessors, 1000)); builders.put(names.analyze, new fixedexecutorbuilder(settings, names.analyze, 1, 16)); builders.put(names.search, new autoqueueadjustingexecutorbuilder(settings, names.search, searchthreadpoolsize(availableprocessors), 1000, 1000, 1000, 2000));
如上代码所示,虽然es为我们内置好了许多线程池(generic、index、write、get...),但还可以自定义 executorbuilder对象,创建自定义的线程池。所有的executorbuilder对象创建完毕后,保存到一个hashmap里面。
for (final executorbuilder<?> builder : custombuilders) { if (builders.containskey(builder.name())) { throw new illegalargumentexception("builder with name [" + builder.name() + "] already exists"); } builders.put(builder.name(), builder); }
最后,遍历builders 这个hashmap 取出 executorbuilder对象,调用它的build()方法创建线程池
for (@suppresswarnings("unchecked") final map.entry<string, executorbuilder> entry : builders.entryset()) { final executorbuilder.executorsettings executorsettings = entry.getvalue().getsettings(settings); //这里执行build方法创建线程池 final executorholder executorholder = entry.getvalue().build(executorsettings, threadcontext); if (executors.containskey(executorholder.info.getname())) { throw new illegalstateexception("duplicate executors with name [" + executorholder.info.getname() + "] registered"); } logger.debug("created thread pool: {}", entry.getvalue().formatinfo(executorholder.info)); executors.put(entry.getkey(), executorholder); }
创建index操作的线程池需要指定任务队列,这个任务队列就是:sizeblockingqueue。当然了,也有一些其他操作(比如get操作)的线程池的任务队列也是sizeblockingqueue。
下面参数可看出:该任务队列的长度为200,org.elasticsearch.threadpool.threadpool.threadpool的构造方法:
builders.put(names.index, new fixedexecutorbuilder(settings, names.index, availableprocessors, 200, true));
前面已经提到了,每个线程池都由executorbuilder的build方法创建的。具体到index操作的线程池,它的executorbuilder实例对象是: fixedexecutorbuilder对象,在executorbuilder 保存一些线程池参数信息:(core pool size、max pool size、queue size...)
final executorservice executor = esexecutors.newfixed(settings.nodename + "/" + name(), size, queuesize, threadfactory, threadcontext);
如果queue_size配置为 -1,那就是一个*队列(linkedtransferqueue)。我们是可以修改线程池配置参数的:关于线程池队列长度的配置信息参考:
而index操作对应的线程池的任务队列长度为200,因此下面代码创建了一个长度为200的 sizeblockingqueue,在代码最后一行,为该线程池指定的拒绝策略是 esabortpolicy
public static esthreadpoolexecutor newfixed(string name, int size, int queuecapacity, threadfactory threadfactory, threadcontext contextholder) { blockingqueue<runnable> queue; if (queuecapacity < 0) { queue = concurrentcollections.newblockingqueue(); } else { queue = new sizeblockingqueue<>(concurrentcollections.<runnable>newblockingqueue(), queuecapacity); } return new esthreadpoolexecutor(name, size, size, 0, timeunit.milliseconds, queue, threadfactory, new esabortpolicy(), contextholder); }
下面开始分析sizeblockingqueue的源码:
一般在自定义线程池时,要么是直接 new threadpoolexecutor,要么是继承threadpoolexecutor,在创建threadpoolexecutor对象时需要指定线程池的配置参数。比如,线程池的核心线程数(core pool size),最大线程数,任务队列,拒绝策略。这里我想提一下拒绝策略,因为某些es的操作具有"强制"执行的特点:如果某个任务被标记为强制执行,那么向线程池提交该任务时,就不能拒绝它。是不是很厉害?想想,线程池是如何做到的?
下面举个例子:
//创建任务队列,这里没有指定任务队列的长度,那么这就是一个*队列 private blockingqueue<runnable> taskqueue = new linkedblockingqueue<>(); //创建线程工厂,由它来创建线程 private threadfactory threadfactory = new threadfactorybuilder().setnameformat("thread-%d").setuncaughtexceptionhandler(exceptionhandler).build(); //创建线程池,核心线程数为4,最大线程数为16 private threadpoolexecutor executor = new threadpoolexecutor(4, 16, 1, timeunit.days, taskqueue, threadfactory, rejectexecutionhandler);
这里创建的线程池,它的线程数量永远不可能达到最大线程数量16,为什么?因为我们的任务队列是一个*队列,当向线程池中提交任务时,linkedblockingqueue.offer方法不会返回false。而在jdk源码java.util.concurrent.threadpoolexecutor.execute中,当任务入队列失败返回false时,才有可能触发addwork创建新线程。这个时候,你可能会说:在 new linkedblockingqueue的时候指定队列长度不就完了?比如这样指定队列长度为1024
private blockingqueue<runnable> taskqueue = new linkedblockingqueue<>(1024);
但是,有没有一种方法,能够做到:当core pool size 个核心线程数处理不过来时,先让线程池的线程数量创建到最大值(max pool size),然后,若还有任务提交到线程池,则让任务排队等待处理?sizeblockingqueue 重写了blockingqueue的offer方法,实现了这个功能。
另外,我再反问一下?如何确定1024就是一个合适的队列容量?万一提交任务速度很快,一下子任务队列就满了,长度1024就会导致大量的任务被拒绝。
es中的 resizableblockingqueue 实现了一种可动态调整队列长度的任务队列,有兴趣的可以去研究一下。
sizeblockingqueue 封装了 linkedtransferqueue,而 linkedtransferqueue 是一个*队列,与linkedblockingqueue不同的是,linkedtransferqueue的构造方法是不能指定任务队列的长度(capacity)的。因此,sizeblockingqueue定义一个capacity属性提供了队列有界的功能。
好,来看看sizeblockingqueue是如何重写offer方法的:org.elasticsearch.common.util.concurrent.sizeblockingqueue.offer(e)
@override public boolean offer(e e) { while (true) { //获取当前任务队列的长度,即:当前任务队列里面有多少个任务正在排队等待执行 final int current = size.get(); //如果正在等待排队的任务数量大于等于任务队列长度的最大值(容量), //返回false 就有可能 触发 java.util.concurrent.threadpoolexecutor.addworker 调用创建新线程 if (current >= capacity()) { return false; } //当前正在排队的任务数量尚未超过队列的最大长度,使用cas 先将任务队列长度加1,[cas的经典用法] if (size.compareandset(current, 1 + current)) { break; } } //将任务添加到队列 boolean offered = queue.offer(e); if (!offered) { //如果未添加成功,再把数量减回去即可 size.decrementandget(); } return offered; }
上面,就是通过先判断当前排队的任务是否小于任务队列的最大长度(容量) 来实现:优先创建线程数量到 max pool size。下面来模拟一下使用 sizeblockingqueue 时处理任务的步骤:
根据前面的介绍:线程池 core pool size=4,max pool size=16,taskqueue 是 sizeblockingqueue,任务队列的最大长度是200
1,提交1-4号 四个任务给线程池,线程池创建4个线程处理这些任务
2,1-4号 四个任务正在执行中...此时又提交了8个任务到线程池
3,这时,线程池是再继续创建8个线程,处理 5-12号任务。此时,线程池中一共有4+8=12个线程,小于max pool size
4,假设 1-12号任务都正在处理中,此时又提交了8个任务到线程池
5,这时,线程池会再创建4个新线程处理其中的13-16号 这4个任务,线程数量已经达到max pool size,不能再创建新线程了,还有4个任务(17-20号)入队列排队等待。
有没有兴趣模拟一下使用linkedblockingqueue作为任务队列时,线程池又是如何处理这一共提交的20个任务的?
最后来分析一下 sizeblockingqueue 如何支持:当向线程池提交任务时,如果任务被某种拒绝策略拒绝了,如果这种任务又很重要,那能不能强制将该任务提交到线程池的任务队列中呢?
这里就涉及到:在创建线程池时,为线程池配置了何种拒绝策略了。下面以index操作的线程池为例说明:
在org.elasticsearch.common.util.concurrent.esexecutors.newfixed 中:可知该线程池所使用的拒绝策略是:esabortpolicy
return new esthreadpoolexecutor(name, size, size, 0, timeunit.milliseconds, queue, threadfactory, new esabortpolicy(), contextholder);
看 esabortpolicy 的源码:org.elasticsearch.common.util.concurrent.esabortpolicy.rejectedexecution
if (r instanceof abstractrunnable) { //判断该任务是不是一个 可强制提交的任务 if (((abstractrunnable) r).isforceexecution()) { blockingqueue<runnable> queue = executor.getqueue(); if (!(queue instanceof sizeblockingqueue)) { throw new illegalstateexception("forced execution, but expected a size queue"); } //是一个可强制提交的任务,并且 线程池的任务队列是 sizeblockingqueue时,强制提交任务 try { ((sizeblockingqueue) queue).forceput(r); } catch (interruptedexception e) { thread.currentthread().interrupt(); throw new illegalstateexception("forced execution, but got interrupted", e); } return; } } rejected.inc(); //任务被拒绝且未能强制执行, 抛出esrejectedexecutionexception异常后,会被 esthreadpoolexecutor.doexecute catch, 进行相应的处理 throw new esrejectedexecutionexception("rejected execution of " + r + " on " + executor, executor.isshutdown());
abstractrunnable 是提交的runnable任务,只要runnable任务的 isforceexecution()返回true,就表明这个任务需要“强制提交”。关于abstractrunnable,可参考:elasticsearch中各种线程池分析
那为什么只有当任务队列是 sizeblockingqueue 时,才可以强制提交呢?这很好理解:首先sizeblockingqueue它封装了linkedtransferqueue,linkedtransferqueue本质上是一个*队列,实际上可以添加无穷多个任务(不考虑oom),只不过是用 capacity 属性限制了队列的长度而已。
如果,任务队列是 new linkedblockingqueue<>(1024)
,肯定是不能支持强制提交的,因为当linkedblockingqueue长度达到1024后,再提交任务,直接返回false了。从这里也可以借鉴es线程池任务队列的设计方式,应用到项目中去。
综上:只有runnable任务 isforceexecution返回true,并且线程池的任务队列是sizeblockingqueue时,向线程池提交任务时,总是能提交成功(强制执行机制保证)。其他情况下,任务被拒绝时,会抛出esrejectedexecutionexception异常。
强制提交,把任务添加到任务队列 sizeblockingqueue 中,源码如下:
org.elasticsearch.common.util.concurrent.sizeblockingqueue.forceput
/** * forces adding an element to the queue, without doing size checks. */ public void forceput(e e) throws interruptedexception { size.incrementandget(); try { queue.put(e); } catch (interruptedexception ie) { size.decrementandget(); throw ie; } }
总结:
es会为每种操作创建一个线程池,本文基于index操作分析了es中线程池的任务队列sizeblockingqueue。对于 index 操作而言,它的线程池是由org.elasticsearch.threadpool.fixedexecutorbuilder 的build方法创建的,线程池的最大核心线程数和最大线程数相同,使用的任务队列是 sizeblockingqueue,长度为200,拒绝策略是:org.elasticsearch.common.util.concurrent.esabortpolicy。
sizeblockingqueue 本质上是一个 linkedtransferqueue,其实es中所有的任务队列都是封装linkedtransferqueue实现的,并没有使用linkedblockingqueue。
es中的所有任务(runnable)都是基于org.elasticsearch.common.util.concurrent.abstractrunnable这个抽象类封装的,当然有一些任务是通过lambda表达式的形式提交的。任务的具体处理逻辑在 org.elasticsearch.common.util.concurrent.abstractrunnable#dorun 方法中,任务执行完成由onafter()处理,执行出现异常由onfailure()处理。线程池的 org.elasticsearch.common.util.concurrent.esthreadpoolexecutor#doexecute 方法 里面就是整个任务的处理流程:
protected void doexecute(final runnable command) { try { super.execute(command); } catch (esrejectedexecutionexception ex) { if (command instanceof abstractrunnable) { // if we are an abstract runnable we can handle the rejection // directly and don't need to rethrow it. try { ((abstractrunnable) command).onrejection(ex); } finally { ((abstractrunnable) command).onafter(); } } else { throw ex; } } }
最后,es的线程池模块代码主要在 org.elasticsearch.threadpool 和 org.elasticsearch.common.util.concurrent 包下。总体来说,threadpool模块相比于es的其他模块,是一个小模块,代码不算复杂。但是threadpool又很重要,因为它是其他模块执行逻辑的基础,threadpool 再配上异步执行机制,是es源码中其他操作的源码实现思路。
上一篇: bootStrap表单验证插件的使用
下一篇: 五胡十六国争霸是怎么开始?怎么结束的?