并发编程——彻底掌握CountDownLatch,CyclicBarrier和Semaphore
一级目录
二级目录
三级目录
CountDownLatch
见前一篇:并发编程——一文看尽CountDownLatch原理
CyclicBarrier 作用
CountDownLatch是某个线程A执行的过程中,等待其他的一个或多个线程全部执行完毕,线程A再继续执行当前的流程,起到一个同步等待的作用。
CyclicBarrier(循环壁垒)同样起到一个同步等待的作用的,但它的作用是让一组线程在执行的过程中执行到某一个阶段(壁垒)时进行阻塞,等待到某个统一的状态再继续执行,这个状态称之为壁垒Barrier。CyclicBarrier是可以循环使用的,所以称为循环壁垒。
代码实例演示
工作类Worker2:
public class Worker2 implements Runnable {
String workerName;
CyclicBarrier cyclicBarrier;
public Worker2(String workerName,CyclicBarrier cyclicBarrier){
this.workerName = workerName;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println( workerName + " 工作中 ...");
try {
Thread.sleep(5000);
System.out.println(workerName + " 完成当前作业,正在等待其他线程执行");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(workerName + "所有线程执行完毕,继续执行当前线程任务.......");
}
}
主类:
public class ThreadsTest {
public static void main(String[] args) throws InterruptedException {
int n = 3;
System.out.println("CyclicBarrier test ....");
CyclicBarrier cyclicBarrier = new CyclicBarrier(n);
for(int i = 0; i < n; i++){
Worker2 worker2 = new Worker2("worker-" + i,cyclicBarrier);
Thread thread = new Thread(worker2);
thread.start();
}
//System.out.println("CyclicBarrier end ....");
}
}
执行效果:
可以看到所有线程都会进入一个阻塞状态,等到所有线程都执行到cyclicBarrier.await(),才继续往下执行。
我们看下主线程会不会被阻塞,在ThreadsTest主类的main方法加入:
System.out.println("CyclicBarrier end ....");
执行效果:
可以看到主线程并没有阻塞而是继续往下执行,所以CyclicBarrier阻塞的是子线程,是子线程间的同步。
实现原理解析
理清两个关键点:
- 线程如何被阻塞?
- 怎样被唤醒?
深入源码窥一番.
CyclicBarrier部分源码:
public class CyclicBarrier {
private static class Generation {
boolean broken = false;
}
/**
* The lock for guarding barrier entry
* 同步锁
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* Condition to wait on until tripped
* 拦截器
*/
private final Condition trip = lock.newCondition();
/**
* The number of parties
* 每次拦截的线程数,CyclicBarrier初始化时指定
*/
private final int parties;
/**
* The command to run when tripped
* 换代之前执行的任务
*/
private final Runnable barrierCommand;
/**
* The current generation
* 新一代
*/
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
* 计数器,初始时等于parties,当一个线程执行到await()方法时count--
* 如果count == 0 说明所有线程都运行到壁垒状态,此时唤醒所有线程继续执行
* generation 进入下一代,可循环使用
*/
private int count;
/**
* 构造函数
* @param parties 表示拦截的线程数
* @param barrierAction 所有线程执行到壁垒状态后要执行的操作
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/**
* 构造函数
* @param parties 表示拦截的线程数
*
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
...
}
接着看最重要的方法:await()方法,线程被阻塞还是唤醒其他阻塞线程,均在这个方法中。
/**
* 不带参数的await()方法
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* 带超时设置的awai()方法
*/
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
/**
* 核心方法
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 一个线程进来会先加锁
lock.lock();
try {
// 获取当前的循环代的情况
final Generation g = generation;
// 如果当前代挂了,或者是新一代了,抛出错误
if (g.broken)
throw new BrokenBarrierException();
// 如果当前线程被中断,换代唤醒其他等到线程,抛出错误
if (Thread.interrupted()) {
// 设置当前屏障为已破坏(也就是已经挂了)的状态,并唤醒其他线程
breakBarrier();
throw new InterruptedException();
}
// 计数器减一
int index = --count;
// 如果当前线程是最后一个线程则index == 0,此时所有线程都到底壁垒状态,需要唤醒所有阻塞的线程
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 构造CyclicBarrier时如果指定了barrierCommand,则执行barrierCommand的run()方法
if (command != null)
command.run();
// 标识当前执行流程是否正常
ranAction = true;
// 进入下一代,唤醒所有等待的线程,暂时跳过,知道在这里唤醒线程就可以了
nextGeneration();
// 返回,方法结束
return 0;
} finally {
// 判断前面的代码是否正常执行,如果出现异常则进入下一代
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 线程阻塞,直到超时、中断、被打断、唤醒
for (;;) {
try {
// 没有超时设置,则调用trip.await()方法。下面代码可以暂时跳过,因为并不重要,直接看await()方法
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
await() 方法是Condition接口的一个方法,由AQS实现。主要作用作用:阻塞当前的线程并释放锁,直至该线程被唤醒或被中断。
public final void await() throws InterruptedException {
// 线程被中断,抛出错误
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程加入阻塞队列
Node node = addConditionWaiter();
// 释放当前锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断当前线程是否已经加入阻塞队列中,并进行相关调度。这里不详细叙述
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
最后我们回来看一下唤醒否方法:
// CyclicBarrier类:
/** 源码注释写得很清楚了:唤醒所有线程,进入下一代。阻塞是调用Condition的await()方法
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
// 调用Condition的signalAll()方法,由AQS实现
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
// AQS类:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 第一个节点不为空,说明当前有阻塞的线程,唤醒所有阻塞线程
if (first != null)
doSignalAll(first);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
// 逐个唤醒
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
// 真正的唤醒方法
transferForSignal(first);
first = next;
} while (first != null);
}
/**
* 这里不进行详细展开,不然太长了。可以去看AQS中waitStatus状态变量的作用
* 只需要明白它是一个状态变量,用来标识线程是阻塞还是唤醒。
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
// 核心属性 waitStatus
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
到这里可以理一下整体的流程了:
Semaphore
Semaphore:信号量,可以控制同时访问的线程个数,通过acquire() 获得一个许可,通过release()释放一个许可。 获得许可的线程可以继续执行,没有获得许可的线程进入等待状态。
代码示例演示
我们假设现在有10个人要去饮水机打水,但是饮水机只有2个。此时先到的肯定先占用饮水机打水,慢到的则等待别人打完水,再试图抢占饮水机。
代码:
public class Worker3 implements Runnable{
String workerName;
Semaphore semaphore;
public Worker3(String workerName,Semaphore semaphore){
this.workerName = workerName;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
// 获取许可,其实就是试图获取资源
semaphore.acquire();
System.out.println( workerName + " 占用一个饮水机,正在打水 ...");
// 模拟打水
Thread.sleep(3000);
System.out.println(workerName + " 已经打完水,不再占用该饮水机 ...");
// 释放占用的资源
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
主类:
public class ThreadsTest {
public static void main(String[] args) throws InterruptedException {
int n = 10;
Semaphore semaphore = new Semaphore(2);
for(int i = 0; i < n; i++){
Worker3 worker3 = new Worker3("worker-" + i,semaphore);
Thread thread = new Thread(worker3);
thread.start();
}
}
}
测试结果:
我们可以看到每次最多只有两个人在打水,而且打水的顺序是无序的。这是因为Semaphore默认使用的使用的是非公平锁,也就是所有人不管先后都能去抢饮水机的使用权。如果要实现顺序,那么可以在初始化的时候指定为公平锁:
// true为公平锁,false为非公平锁
Semaphore semaphore = new Semaphore(2,true);
可以看到大家都按照先后顺序打水
上一篇: linux配置Hadoop环境(jdk+hadoop)
下一篇: Swarm Bee配置
推荐阅读
-
并发编程——彻底掌握CountDownLatch,CyclicBarrier和Semaphore
-
Java并发编程系列---Java中的并发工具类CountDownLatch、CyclicBarrier、Semaphore、Exchanger
-
Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解
-
Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解
-
并发编程CountDownLatch,CyclicBarrier,Semaphore实现原理分析
-
并发编程CountDownLatch,CyclicBarrier,Semaphore实现原理分析
-
并发编程(二)—— CountDownLatch、CyclicBarrier和Semaphore
-
Java并发编程中CountDownLatch和CyclicBarrier的使用
-
并发编程(二)—— CountDownLatch、CyclicBarrier和Semaphore
-
Java并发编程之CountDownLatch、CyclicBarrier和Semaphore