欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

互联网技术13——CountDownLatch和CyclicBarrier

程序员文章站 2022-07-11 11:53:44
...

CountDownLatch:

countdownLatch经常用于监听某些初始化操作,等待初始化执行完毕后,再通知主线程继续工作。

同步辅助类,在完成一组正在执行的操作之前,它允许一个或多个线程一直等待。实例化CountDownLatch时,需要制定唤醒次数。

当线程执行了CountDownLatch.await()时,则当前线程处于阻塞的等待状态。其他线程根据执行逻辑,执行唤醒方法

package com.company.concurrentTest;

import java.util.concurrent.CountDownLatch;

/**
 * Created by BaiTianShi on 2018/8/22.
 */
public class CountDownLatchTest {
    public static void main(String[] args) {
        final CountDownLatch latch = new CountDownLatch(2);


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {

                    System.out.println("进入线程t1,等待其线程处理完成");
                    latch.await();

                    System.out.println("t1继续执行");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"t1");


        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t2开始执行");
                    //模拟执行所需的时间
                    Thread.sleep(1000);
                    System.out.println("t2数据处理完毕,已经通知给了t1");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"t2");
        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t3开始执行");
                    //模拟执行所需的时间
                    Thread.sleep(2000);
                    System.out.println("t3数据处理完毕,已经通知给了t1");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"t3");


        t2.start();
        t3.start();
        t1.start();

    }
}

执行结果

t2开始执行
t3开始执行
进入线程t1,等待其线程处理完成
t2数据处理完毕,已经通知给了t1
t3数据处理完毕,已经通知给了t1
t1继续执行

CountDownLatch是一个线程调用await()方法,处于自旋等待状态,注意这里不是阻塞的,而是自旋。也就是说当前线程不会放弃锁,是自动执行一套死循环,直到最初定义的latch=new CountDownLatch(2)中的2减到0 的时候。而这个2减到0的过程需要其它线程执行latch.countLatch方法,每执行一次会减1,直到由2减到0,这时自旋线程跳出死循环。关于自旋的源码,我发到下面。

互联网技术13——CountDownLatch和CyclicBarrier互联网技术13——CountDownLatch和CyclicBarrier互联网技术13——CountDownLatch和CyclicBarrier

 

CyclicBarrier

CyclicBarrier是JDK1.5的concurrent并发包中提供的一个工具类,所谓Cyclic既循环的意思,Barrier既屏障的意思。之所以用循环修饰,是因为所有线程释放彼此之后,这个屏障可以重新使用。

CyclicBarrier构造方法中,

1. 可以直接传入一个int类型参数指定等待次数n,barrier = new CyclicBarrier(int ),此时,各线程如果正常执行await()方法(这里先不考虑await(timeout,TimeUnit)这中情况 )时自旋等待,如果各线程执行await()次数总和达到指定的等待次数n,则各个线程同时继续执行。也可理解为运动会,大家在起跑线,都准备好后才可同时起跑。

2. 也可以同时传入一个实现了RunAble接口的类CyclicBarrier(int parties, Runnable barrierAction)

此时在await()方法自旋的线程必须等待这个实现runAble这个类的run()方法执行完,才会同时结束自旋,开始各自向下执行。

3.barrier可以重置,barrier.rest()。但是同样分两种情况,

a.如果没有等待的线程,此时代码会正常执行

b.如果有等待的线程,也就是说n尚未减到0,则会BrokenBarrierException异常,打开演示代码注释的部分,就能看到此效果。

4.如果把await()换成await(timeout,TimeUnit)则表示,等待线程超时时间,timeOut是超时时间数量,TimeUnit是时间单位可设置成时分秒等,如果等待超过了这个时间,则抛出BrokenBarrierException异常;

因为本人是边看着源码边查看技术博客,同时写演示代码,为了方便,各种情况揉合到一起,没有分开演示。如果有想学习,但是觉得太乱不好理解的可以联系我的qq:94514329。我会把代码拆分后发给你,或者做简单讲解

代码如下:

package com.company.concurrentTest;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * Created by BaiTianShi on 2018/8/23.
 */
public class CyclicBarrierTest {
    public static void main(String[] args) {
        final CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    System.out.println("每次打开屏障后,都要执行我");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    //模拟处理数据的过程
                    System.out.println("t1开始处理数据");
                    Thread.sleep(2000);
                    System.out.println("t1数据处理完毕,执行了await方法");
                    barrier.await();
                    System.out.println("t1结束自旋,开始继续执行");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        },"t1");
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t2开始处理数据");
                    Thread.sleep(3000);
                    System.out.println("t2数据处理完毕,执行了await方法");
                    barrier.await();
                    System.out.println("t2结束自旋,开始继续执行");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        },"t2");
        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t3开始处理数据");
                    Thread.sleep(1000);
                    System.out.println("t3数据处理完毕,执行了await方法");
                    barrier.await();
                    System.out.println("t3结束自旋,开始继续执行");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        },"t3");
        t1.start();
        t2.start();
        t3.start();

//此注释为在有线程等待的情况下刷新
//        try {
//            Thread.sleep(5000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//        System.out.println("在有线程等待的情况下刷新,预期结果是抛出BrokenBarrierException异常");
//        barrier.reset();

        try {
            System.out.println("主线程等待中");
            barrier.await();
            Thread.sleep(5000);
            System.out.println("主线程结束等待,继续处理业务");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("在没有线程等待的情况下刷新,预期结果是正常执行");
        barrier.reset();

        Thread t4 = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    //模拟处理数据的过程
                    System.out.println("t4开始处理数据");
                    Thread.sleep(2000);
                    System.out.println("t4数据处理完毕,执行了await方法");
                    barrier.await();
                    System.out.println("t4结束自旋,开始继续执行");
                    Thread.sleep(8000);
                    System.out.println("如果打印,说明有线程等待超时,抛出异常后,其他线程会结束等待状态");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        },"t4");
        Thread t5 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t5开始处理数据");
                    Thread.sleep(3000);
                    System.out.println("t5数据处理完毕,执行了await方法");
                    barrier.await();
                    System.out.println("t5结束自旋,开始继续执行");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        },"t5");

        t4.start();
        t5.start();
        try {
            System.out.println("主线程带有计时的等待");
            barrier.await(4, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            System.out.println("主线程等待超时,抛出超时异常");
            e.printStackTrace();
        }
    }

}

运行结果:

主线程等待中
t1开始处理数据
t2开始处理数据
t3开始处理数据
t3数据处理完毕,执行了await方法
t1数据处理完毕,执行了await方法
t2数据处理完毕,执行了await方法
每次打开屏障后,都要执行我
t2结束自旋,开始继续执行
t3结束自旋,开始继续执行
t1结束自旋,开始继续执行
主线程结束等待,继续处理业务
在没有线程等待的情况下刷新,预期结果是正常执行
主线程带有计时的等待
t4开始处理数据
t5开始处理数据
t4数据处理完毕,执行了await方法
t5数据处理完毕,执行了await方法
主线程等待超时,抛出超时异常
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.company.concurrentTest.CyclicBarrierTest$5.run(CyclicBarrierTest.java:112)
	at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.TimeoutException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at com.company.concurrentTest.CyclicBarrierTest.main(CyclicBarrierTest.java:144)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.company.concurrentTest.CyclicBarrierTest$6.run(CyclicBarrierTest.java:130)
	at java.lang.Thread.run(Thread.java:745)

 

互联网技术13——CountDownLatch和CyclicBarrier

 

 

 

 

 

相关标签: 多线程