多线程编程学习九(并发工具类).
程序员文章站
2022-07-28 20:50:22
CountDownLatch 1. CountDownLatch 允许一个或多个线程等待其他线程完成操作。 2. CountDownLatch 可以替代 join 的作用,并提供了更丰富的用法。 3. CountDownLatch 的 countDown 方法,N 会减1;CountDownLatc ......
countdownlatch
- countdownlatch 允许一个或多个线程等待其他线程完成操作。
- countdownlatch 可以替代 join 的作用,并提供了更丰富的用法。
- countdownlatch 的 countdown 方法,n 会减1;countdownlatch 的 await 方法会阻塞当前线程,直到 n 变成零。
- countdownlatch 不可能重新初始化或者修改 countdownlatch 对象的内部计数器的值。
- countdownlatch 内部由 aqs 共享锁实现。
public class countdownlatchtest { private static final countdownlatch down_latch = new countdownlatch(2); public static void main(string[] args) throws interruptedexception { new thread(() -> { system.out.println(1); down_latch.countdown(); system.out.println(2); down_latch.countdown(); }).start(); down_latch.await(); system.out.println("3"); } }
cyclicbarrier
- cyclicbarrier 设置一个屏障(也可以叫同步点),拦截阻塞一组线程,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
- cyclicbarrier 默认的构造方法是cyclicbarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 cyclicbarrier 我已经到达了屏障,然后当前线程被阻塞。
- cyclicbarrier 还提供一个更高级的构造函数 cyclicbarrier(int parties,runnable barrieraction),用于在线程到达屏障时,优先执行 barrieraction,方便处理更复杂的业务场景。
- getnumberwaiting 方法可以获得 cyclicbarrier 阻塞的线程数量;isbroken()方法用来了解阻塞的线程是否被中断。
- cyclicbarrier 的计数器可以使用 reset() 方法重置(countdownlatch 的计数器只能使用一次)。所以 cyclicbarrier 能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。
- cyclicbarrier 可以用于多线程计算数据,最后合并计算结果的场景。
- cyclicbarrier 内部采用重入锁 reentrantlock 实现。
public class bankwaterservice implements runnable { // 创建4个屏障,处理完之后执行当前类的run方法 private cyclicbarrier barrier = new cyclicbarrier(4, this); // 假设有4个计算任务,所以只启动4个线程 private executor executor = executors.newfixedthreadpool(4); // 保存每个任务的计算结果 private concurrenthashmap<string, integer> sheetbankwatercount = new concurrenthashmap<>(); private atomicinteger atomicinteger = new atomicinteger(1); private void count() { for (int i = 0; i < 4; i++) { thread thread = new thread(() -> { // 当前任务的计算结果,计算过程忽略 sheetbankwatercount.put(thread.currentthread().getname(), 1); // 计算完成,插入一个屏障 try { barrier.await(); } catch (interruptedexception e) { e.printstacktrace(); } catch (brokenbarrierexception e) { e.printstacktrace(); } }, "线程" + atomicinteger.getandincrement()); executor.execute(thread); } } @override public void run() { int result = 0; // 汇总每个任务计算出的结果 for (map.entry<string, integer> sheet : sheetbankwatercount.entryset()) { result += sheet.getvalue(); } //将结果输出 sheetbankwatercount.put("result", result); system.out.println(result); } public static void main(string[] args) { bankwaterservice bankwatercount = new bankwaterservice(); bankwatercount.count(); } }
semaphore
- semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
- semaphore 可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。
- semaphore的构造方法 semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。
- 首先线程使用 semaphore 的 acquire() 方法获取一个许可证,使用完之后调用 release() 方法归还许可证。还可以用 tryacquire() 方法尝试获取许可证。
- intavailablepermits():返回此信号量中当前可用的许可证数。
- intgetqueuelength():返回正在等待获取许可证的线程数。
- booleanhasqueuedthreads():是否有线程正在等待获取许可证。
- semaphore 内部使用 aqs 共享锁实现。
public class semaphoretest { private static final int thread_count = 30; private static executorservice executor = executors.newfixedthreadpool(thread_count); private static semaphore semaphore = new semaphore(10); private static atomicinteger atomicinteger = new atomicinteger(1); public static void main(string[] args) { for (int i = 0; i < thread_count; i++) { executor.execute(() -> { try { semaphore.acquire(); system.out.println("save data" + atomicinteger.getandincrement()); semaphore.release(); } catch (interruptedexception e) { } }); } executor.shutdown(); } }
exchanger
- exchanger(交换者)是一个用于线程间协作的工具类 —— 用于线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange() 方法,它会一直等待第二个线程也执行 exchange 方法。
- 可简单地将 exchanger 对象理解为一个包含两个格子的容器,通过 exchanger 方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动将两个格子的信息交换,然后返回给线程,从而实现两个线程的信息交换。
- exchanger 可用于遗传算法。(遗传算法:需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出交配结果)
- exchanger 可用于校对工作,比如一份数据需要两个人同时进行校对,都校对无误后,才能进行后续处理。这时,就可以使用 exchanger 比较两份校对结果。
- exchanger 内部采用无锁 cas 实现,exchange 使用了内部对象 node 的两个属性 — item 、match,分布存储两个线程的值。
public class exchangertest { private static final exchanger<string> exchange = new exchanger<>(); private static executorservice threadpool = executors.newfixedthreadpool(2); public static void main(string[] args) { threadpool.execute(() -> { try { string result = exchange.exchange("数据a"); system.out.println("a的exchange结果:" + result); } catch (interruptedexception e) { } }); threadpool.execute(() -> { try { string result = exchange.exchange("数据b"); system.out.println("b的exchange结果:" + result); } catch (interruptedexception e) { } }); threadpool.shutdown(); } }