欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发编程的艺术之读书笔记(十四)

程序员文章站 2022-05-13 15:04:35
...

前言:

上一部分我们一起学习了java中的原子类,这一部分,我们来学习java中的并发工具类

1. CountDownLatch

CountDownLatch是一个同步工具类,它允许一个或多个线程等待其他线程各自执行完毕后再执行。CountDownLatch通过计数器来实现,CountDownLatch的构造函数接收一个int类型的参数作为计数器,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,countDownLatch的await()方法会阻塞当前线程直到计数器为0,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。需要注意的是,计数器必须大于等于0,只是等于0的时候,计数器就是0,调用await()方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法happens-before另一个线程调用await方法。

下面我们来看看源码,首先是CountDownLatch的构造函数

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

其中count是计数值,我们看到这里有个Sync类,可以想到CountDownLatch也是基于AQS同步器实现的。我们还看到类中有这么三个方法

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException {}
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {}
////将count值减1
public void countDown(){}

示例

public class CountDownLatchTest {

    public static void main(String[] args) {
        final CountDownLatch latch = new CountDownLatch(2);
        System.out.println("主线程开始执行…… ……");
        //第一个子线程执行
        ExecutorService es1 = Executors.newSingleThreadExecutor();
        es1.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    System.out.println("子线程:"+Thread.currentThread().getName()+"执行");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            }
        });
        es1.shutdown();

        //第二个子线程执行
        ExecutorService es2 = Executors.newSingleThreadExecutor();
        es2.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("子线程:"+Thread.currentThread().getName()+"执行");
                latch.countDown();
            }
        });
        es2.shutdown();
        System.out.println("等待两个线程执行完毕…… ……");
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("两个子线程都执行完毕,继续执行主线程");
    }
}

结果是

主线程开始执行…… ……
等待两个线程执行完毕…… ……
子线程:pool-1-thread-1执行
子线程:pool-2-thread-1执行
两个子线程都执行完毕,继续执行主线程

2. CyclicBarrier

CyclicBarrier的意思是同步屏障,它可以循环使用,它所能做到的是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开。我们还是来看一下构造方法

public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)

其中paties参数表示屏障拦截的线程数,第二个构造方法中的barrierAction表示最后一个到达线程要做的任务。每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞,举例如下

public class CyclicBarrierTest {
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args) {
        //新线程
        new Thread(() -> {
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(1);
        }).start();
        //主线程
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
}

如果把new CyclicBarrier(2)改成new CyclicBarrier(3),那么主线程和子线程将会永远等待,因为没有第三个线程来执行await,所以之前按到达屏障的线程不会继续执行。接下来我们再来看一下第二个构造函数参数的用法

public class CyclicBarrierTest {
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,new A());

    public static void main(String[] args) {
        //新线程
        new Thread(() -> {
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(1);
        }).start();
        //主线程
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
    static class A implements Runnable{

        @Override
        public void run() {
            System.out.println(3);
        }
    }
}

CyclicBarrier可以用于多线程计算数据,最后合并数据的场景。

看过了CyclicBarrier和CountDownLatch可能会觉得他们两个类和功能有点像,那么CyclicBarrier和CountDownLatch有什么区别呢,首先CountDownLatch的计数器只能用一次,而CyclicBarrier的计数器可以使用reset()方法重置。CyclicBarrier可以处理更复杂的业务场景。

3. 控制并发线程数的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,保证合理使用公共资源。这个信号量的概念可能有点难以理解,举个形象点的例子,比如说在超市买东西结账,只开了3个收银窗口,那么同时只能有3个人可以结账,第四个人来了就只能在后面排队,不可以结账。Semaphore的使用方法比较简单,Semaphore的构造方法接收一个整形数字,代表可用的许可证数量,然后线程调用acquire方法获取一个许可证,使用完后调用release归还许可证就可以了。

同时Semaphore还提供一些其他方法

  • int availablePermits() 返回此信号量中当前可用的许可证数
  • int getQueueLength() 返回正在等待获取许可证的线程数
  • boolean hasQueuedThreads() 是否有线程正在等待获取许可证
  • void reducePermits 减少reduction个许可证,是protected方法
  • Collection<Thread> getQueuedThreads() 返回所有等待获取许可证的线程集合,是protected方法

4. 线程间交换数据的Exchanger

Exchanger是一个用于线程间协作的工具类,Exchanger用于线程间的数据交换,它提供一个同步点,在这个同步点,两个线程可以交换数据,如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给哦对方。

下面来举个例子演示一下用法

Exchanger可以用于校对工作,比如我们需要将纸质银行流水通过人工录入的方式录入成电子银行流水,为了避免错误,采用AB两人录入,录入到Excel以后,系统要加载这两个Excel,并对两个Excel数据进行比较看看是否录入一致,代码如下

public class ExchangerTest {
    public static final Exchanger<String> exchanger = new Exchanger<>();
    private static ExecutorService threadTool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadTool.execute(() -> {
            try {
                String a = "银行流水a";
                exchanger.exchange(a);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        threadTool.execute(() -> {
            try {
                String b = "银行流水b";
                String a = exchanger.exchange(b);
                System.out.println("a和b的数据是否一致," + a.equals(b) + ",a录入的是" + a + ",b录入的是" + b);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        threadTool.shutdown();
    }
}

总结

本部分我们简单的学习了一下java中的并发工具类,下一部分我们将来学习java中的线程池。

相关标签: 并发编程的艺术