Java中如何限制方法访问的并发数
并发编程一直是Java基础中的高地,但很多只要有两三年Java基础的工程师,会在简历中很明确的标明“熟悉多线程编程、有高并发编程经验”,来凸显自己编程能力有多厉害,但实际上可能只是看了《Java Concurrency in Practice》的几个章节而已。其实对很多业务研发工程师来说,高并发编程经验并不是必备的核心竞争力之一,很多需要加锁或者统计的场景,大都可以交给外部系统如Redis来做,即多线程并发场景的转移。
那么作为面试官,如何简单快速考察面试者的多线程编程能力?可能方法有很多,笔者喜欢用到的一个题目如下:“对于单个Java应用,我们如何限制其中某个方法methodA()被调用的并发数不能超过100,如果超过100,超出的请求就直接返回null或抛异常”。笔者会要求面试者在白板上写出相应代码片段(当然并不会要求面试者一定要完整写出用到的类名或方法名)。
这个题看起来并不难,但仔细想想也不那么简单,大约30%的面试者会给出Semaphore的解决方案,例如:
private static Semaphore semaphore = new Semaphore(100); public static Integer methodA() { if(!semaphore.tryAcquire()) { return null; } try { // TODO 方法中的业务逻辑 } finally { semaphore.release(); } }
Semaphore信号量是一种比较完美的解决方案,代码简单而高效,一旦面试者给出这个方案,我们可以顺便考察下信号量相关的知识点。
但还有没有其他的思路或解决方案呢?笔者接触到的面试者,还给出过线程池的方案,因为最高并发数是100,那么表明同时最多只能有100个线程访问该方法,面试者一般会这么写:
private final static ExecutorService pool = new ThreadPoolExecutor(100, 100, 1, TimeUnit.MINUTES, new SynchronousQueue<>()); public static Integer methodAWrapper() { try { Future<Integer> future = pool.submit(() -> methodA()); return future.get(); } catch (Exception e) { return null; } } public static Integer methodA() { // TODO 方法中的业务逻辑 }
其实严格意义来讲,线程池的这种方案无法完美做到“如果超过100,超出的请求就直接返回null或抛异常”,哪怕是使用SynchronousQueue队列。但没关系,最关键的考察点并不在超限如何返回。当面试者写出这种方案,也可以顺便考察下线程池相关的知识点。
大多数面试者想到的是写个计数器,例如:
private static AtomicInteger counter = new AtomicInteger(0); public static Integer methodA() { int value = counter.incrementAndGet(); if(value > 100) { return null; } try { // TODO 方法中的业务逻辑 } finally { counter.decrementAndGet(); } }
于是我一般会问,为啥是选择incrementAndGet方法,而不是选择getAndIncrement?仔细看看会不会有其他问题?大多数面试者经过提示都能发现这里get、incr和比较不是原子操作,会产生“竞态条件”(race condition)。我们先把这种计数器方案叫做方案A,像想到计数器方案的面试者,也有小部分会这样写:
private static AtomicInteger counter = new AtomicInteger(0); public static Integer methodA() { int value = counter.get(); if(value > 100) { return null; } counter.incrementAndGet(); try { // TODO 方法中的业务逻辑 } finally { counter.decrementAndGet(); } }
我们把这种计数器的实现方案叫做方案B,这两种方案都会有“竞态条件”的问题,但产生的现象不一样。
对于方案A,在极端高并发的情况下,每个调用methodA的请求,都会对计数器进行+1,即使我们在finally对计数器进行了-1,也阻止value的值继续上涨,导致远大于100,得到的结果是所有请求没机会执行业务逻辑,即“饿死”现象。
对于方案B,由于是活的执行业务逻辑的许可后再进行的+1操作,很显然在高并发情况下会导致执行业务逻辑的线程数超过100。
很多Java老司机觉得自己不会犯这种错误,但实际上,最近阿里的开源项目Sentinel就有类似方案A的问题,Sentinel中使用责任链的模式,来对每笔调用进行统计、拦截,这里给出构建责任链DefaultSlotsChainBuilder类的代码片段:
public class DefaultSlotsChainBuilder implements SlotsChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); // 统计数据,类似于上文提到的incrementAndGet chain.addLast(new SystemSlot()); // 检查是否超过阈值,类似于上文提到的value > 100 chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } }
而在分布式服务框架Dubbo的早期版本(例如2.5.3),在对Provider提供线程限制保护的executes
(例如:<dubbo:service interface="com.manzhizhen.dubbo.server.service.Dubbo2Service"
ref="dubbo2Service" version="1.0.0" timeout="3000" executes="10" />)的实现方案,就踩了上述方案B的坑。
回归正题,也有面试者给出了阻塞队列的方案,即:
private static BlockingQueue<Integer> reqQueue = new ArrayBlockingQueue<>(100); public static Integer methodA() { if(!reqQueue.offer()) { return null; } try { // TODO 方法中的业务逻辑 } finally { reqQueue.poll(); } }
阻塞队列的方案也是不错的,代码简单,效率也还行。
当然,也有面试者说用Hystrix,嗯嗯,这也是可以的。
做个总结吧,最优方案当然是使用Semaphore,但Semaphore无法统计实际高峰期时的并发量有多少(很多场景需要通过实际最高的并发值来优化我们系统),文章很短,希望对大家能有所帮助。