欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发编程——彻底掌握CountDownLatch,CyclicBarrier和Semaphore

程序员文章站 2024-03-12 12:09:26
...

一级目录

二级目录

三级目录

CountDownLatch

见前一篇:并发编程——一文看尽CountDownLatch原理

CyclicBarrier 作用

CountDownLatch是某个线程A执行的过程中,等待其他的一个或多个线程全部执行完毕,线程A再继续执行当前的流程,起到一个同步等待的作用。

CyclicBarrier(循环壁垒)同样起到一个同步等待的作用的,但它的作用是让一组线程在执行的过程中执行到某一个阶段(壁垒)时进行阻塞,等待到某个统一的状态再继续执行,这个状态称之为壁垒Barrier。CyclicBarrier是可以循环使用的,所以称为循环壁垒。

代码实例演示

工作类Worker2:

public class Worker2 implements Runnable {
    String workerName;
    CyclicBarrier cyclicBarrier;
    public Worker2(String workerName,CyclicBarrier cyclicBarrier){
        this.workerName = workerName;
        this.cyclicBarrier = cyclicBarrier;

    }
    @Override
    public void run() {
        System.out.println( workerName + " 工作中 ...");
        try {
            Thread.sleep(5000);
            System.out.println(workerName + " 完成当前作业,正在等待其他线程执行");
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(workerName + "所有线程执行完毕,继续执行当前线程任务.......");
    }
}

主类:

public class ThreadsTest {
    public static void main(String[] args) throws InterruptedException {
        int n = 3;
        System.out.println("CyclicBarrier test ....");
        CyclicBarrier cyclicBarrier = new CyclicBarrier(n);
        for(int i = 0; i < n; i++){
            Worker2 worker2 = new Worker2("worker-" + i,cyclicBarrier);
            Thread thread = new Thread(worker2);
            thread.start();
        }
        //System.out.println("CyclicBarrier end ....");

    }
}

执行效果:


并发编程——彻底掌握CountDownLatch,CyclicBarrier和Semaphore

可以看到所有线程都会进入一个阻塞状态,等到所有线程都执行到cyclicBarrier.await(),才继续往下执行。


我们看下主线程会不会被阻塞,在ThreadsTest主类的main方法加入:

System.out.println("CyclicBarrier end ....");

执行效果:

并发编程——彻底掌握CountDownLatch,CyclicBarrier和Semaphore


可以看到主线程并没有阻塞而是继续往下执行,所以CyclicBarrier阻塞的是子线程,是子线程间的同步。

实现原理解析

理清两个关键点:

  • 线程如何被阻塞?
  • 怎样被唤醒?


深入源码窥一番.

CyclicBarrier部分源码:

public class CyclicBarrier {

    private static class Generation {
        boolean broken = false;
    }

    /** 
    * The lock for guarding barrier entry 
    * 同步锁
    */
    private final ReentrantLock lock = new ReentrantLock();
    /** 
    * Condition to wait on until tripped 
    * 拦截器
    */
    private final Condition trip = lock.newCondition();
    /** 
    * The number of parties 
    * 每次拦截的线程数,CyclicBarrier初始化时指定
    */
    private final int parties;
    /**
    * The command to run when tripped 
    * 换代之前执行的任务
    */
    private final Runnable barrierCommand;
    /** 
    * The current generation 
    * 新一代
    */
    private Generation generation = new Generation();

    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     * 计数器,初始时等于parties,当一个线程执行到await()方法时count--
     * 如果count == 0 说明所有线程都运行到壁垒状态,此时唤醒所有线程继续执行
     * generation 进入下一代,可循环使用
     */
    private int count;
    
         
     /**
     * 构造函数 
     * @param parties 表示拦截的线程数
     * @param barrierAction 所有线程执行到壁垒状态后要执行的操作
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    
    /**
     * 构造函数 
     * @param parties 表示拦截的线程数
     * 
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    ...
    
}

接着看最重要的方法:await()方法,线程被阻塞还是唤醒其他阻塞线程,均在这个方法中。

    /**
    * 不带参数的await()方法
    */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    /**
    * 带超时设置的awai()方法
    */
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    /**
    * 核心方法
    * Main barrier code, covering the various policies.
    */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        // 一个线程进来会先加锁
        lock.lock();
        try {
            // 获取当前的循环代的情况
            final Generation g = generation;
            
            // 如果当前代挂了,或者是新一代了,抛出错误
            if (g.broken)
                throw new BrokenBarrierException();
            // 如果当前线程被中断,换代唤醒其他等到线程,抛出错误
            if (Thread.interrupted()) {
                // 设置当前屏障为已破坏(也就是已经挂了)的状态,并唤醒其他线程
                breakBarrier();
                throw new InterruptedException();
            }
            // 计数器减一
            int index = --count;
            // 如果当前线程是最后一个线程则index == 0,此时所有线程都到底壁垒状态,需要唤醒所有阻塞的线程
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 构造CyclicBarrier时如果指定了barrierCommand,则执行barrierCommand的run()方法
                    if (command != null)
                        command.run();
                    // 标识当前执行流程是否正常
                    ranAction = true;
                    // 进入下一代,唤醒所有等待的线程,暂时跳过,知道在这里唤醒线程就可以了
                    nextGeneration();
                    // 返回,方法结束
                    return 0;
                } finally {
                    // 判断前面的代码是否正常执行,如果出现异常则进入下一代
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            // 线程阻塞,直到超时、中断、被打断、唤醒
            for (;;) {
                try {
                    // 没有超时设置,则调用trip.await()方法。下面代码可以暂时跳过,因为并不重要,直接看await()方法
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

await() 方法是Condition接口的一个方法,由AQS实现。主要作用作用:阻塞当前的线程并释放锁,直至该线程被唤醒或被中断。

        public final void await() throws InterruptedException {
            // 线程被中断,抛出错误
            if (Thread.interrupted())
                throw new InterruptedException();
            // 当前线程加入阻塞队列
            Node node = addConditionWaiter();
            // 释放当前锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 判断当前线程是否已经加入阻塞队列中,并进行相关调度。这里不详细叙述
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

最后我们回来看一下唤醒否方法:

    // CyclicBarrier类:
    /** 源码注释写得很清楚了:唤醒所有线程,进入下一代。阻塞是调用Condition的await()方法
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // signal completion of last generation
        // 调用Condition的signalAll()方法,由AQS实现
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

    // AQS类:
    public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        // 第一个节点不为空,说明当前有阻塞的线程,唤醒所有阻塞线程
        if (first != null)
            doSignalAll(first);
    }
    
    
    /**
    * Removes and transfers all nodes.
    * @param first (non-null) the first node on condition queue
    */
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        // 逐个唤醒
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            // 真正的唤醒方法
            transferForSignal(first);
            first = next;
        } while (first != null);
    }
    
    /**
     * 这里不进行详细展开,不然太长了。可以去看AQS中waitStatus状态变量的作用
     * 只需要明白它是一个状态变量,用来标识线程是阻塞还是唤醒。
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        // 核心属性 waitStatus
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    

到这里可以理一下整体的流程了:

并发编程——彻底掌握CountDownLatch,CyclicBarrier和Semaphore

Semaphore

Semaphore:信号量,可以控制同时访问的线程个数,通过acquire() 获得一个许可,通过release()释放一个许可。 获得许可的线程可以继续执行,没有获得许可的线程进入等待状态。

代码示例演示

我们假设现在有10个人要去饮水机打水,但是饮水机只有2个。此时先到的肯定先占用饮水机打水,慢到的则等待别人打完水,再试图抢占饮水机。

代码:

public class Worker3 implements Runnable{

    String workerName;
    Semaphore semaphore;
    public Worker3(String workerName,Semaphore semaphore){
        this.workerName = workerName;
        this.semaphore = semaphore;

    }
    @Override
    public void run() {
        try {
            // 获取许可,其实就是试图获取资源
            semaphore.acquire();
            System.out.println( workerName + " 占用一个饮水机,正在打水 ...");
            // 模拟打水
            Thread.sleep(3000);
            System.out.println(workerName + " 已经打完水,不再占用该饮水机 ...");
            // 释放占用的资源
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

主类:

public class ThreadsTest {
    public static void main(String[] args) throws InterruptedException {
        int n = 10;
        Semaphore semaphore = new Semaphore(2);
        for(int i = 0; i < n; i++){
            Worker3 worker3 = new Worker3("worker-" + i,semaphore);
            Thread thread = new Thread(worker3);
            thread.start();
        }
    }
}

测试结果:
并发编程——彻底掌握CountDownLatch,CyclicBarrier和Semaphore


我们可以看到每次最多只有两个人在打水,而且打水的顺序是无序的。这是因为Semaphore默认使用的使用的是非公平锁,也就是所有人不管先后都能去抢饮水机的使用权。如果要实现顺序,那么可以在初始化的时候指定为公平锁:

// true为公平锁,false为非公平锁
Semaphore semaphore = new Semaphore(2,true);

可以看到大家都按照先后顺序打水

并发编程——彻底掌握CountDownLatch,CyclicBarrier和Semaphore

相关标签: 并发编程