欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

JUC——线程同步辅助工具类(Exchanger,CompletableFuture)

程序员文章站 2022-05-18 22:22:23
Exchanger交换空间 如果现在有两个线程,一个线程负责生产数据,另外一个线程负责消费数据,那么这个两个线程之间一定会存在一个公共的区域,那么这个区域的实现在JUC包之中称为Exchanger。 java.util.concurrent.Exchanger类表示一种两个线程可以进行互相交换对象的 ......

Exchanger交换空间

如果现在有两个线程,一个线程负责生产数据,另外一个线程负责消费数据,那么这个两个线程之间一定会存在一个公共的区域,那么这个区域的实现在JUC包之中称为Exchanger

java.util.concurrent.Exchanger类表示一种两个线程可以进行互相交换对象的汇合点。

JUC——线程同步辅助工具类(Exchanger,CompletableFuture)

Exchanger类中定义的方法如下:

  • 构造方法:
    pubilc Exchanger();  //创建一个对象
  • 设置与取得:
    public V exchange(V x) throws InterruptedException

范例:使用Exchanger实现交换处理

package so.strong.mall.concurrent;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;

public class ExchanerDemo {
    public static void main(String[] args) {
        final Exchanger<String> exchanger = new Exchanger<>(); //准备一个交换空间
        for (int i = 0; i < 3; i++) { //3个消费者
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            String data = exchanger.exchange(null);
                            TimeUnit.SECONDS.sleep(2);
                            if (data != null) {
                                System.out.println("[" + Thread.currentThread().getName() + "]取得数据:" + data);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "消费者-" + i).start();
        }

        for (int i = 0; i < 2; i++) { //2个生产者
            final int temp = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 2; j++) {
                        String data = "iTermis-" + temp + "-" + j;
                        try {
                            TimeUnit.SECONDS.sleep(2); //让生产者节奏放慢
                            exchanger.exchange(data);
                            System.out.println("[" + Thread.currentThread().getName() + "]生产了数据:" + data);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "生产者-" + i).start();
        }
    }
}
[生产者-1]生产了数据:iTermis-1-0
[生产者-1]生产了数据:iTermis-1-1
[消费者-1]取得数据:iTermis-1-0
[生产者-0]生产了数据:iTermis-0-0
[生产者-0]生产了数据:iTermis-0-1
[消费者-2]取得数据:iTermis-0-1

  

CompletableFuture线程回调

现在设想一个场景,例如:使用炮兵轰炸某一目标

JUC——线程同步辅助工具类(Exchanger,CompletableFuture)

所有的执行线程在接收到命令之前都要进入到阻塞状态之中,一直到接收到具体的命令之后才会执行下一步操作处理。

java.util.concurrent.CompletableFutureJava8中添加的一个类,该类的主要作用是提供了新的方式来完成异步处理,包括合成和组合事件的非阻塞方式。

JUC——线程同步辅助工具类(Exchanger,CompletableFuture)

CompletableFuture类中有如下的方法:

  • 构造方法:
    public CompletableFuture();
  • 获取命令:
    public T get() throws InterruptedException,ExecutionException  

范例:使用CompletableFuture实现炮兵轰炸操作

package com.itermis.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo { public static void main(String[] args) throws Exception { CompletableFuture<String> future = new CompletableFuture<>(); for (int i = 0; i < 4; i++) { new Thread(() -> { System.out.println("BEFORE[" + Thread.currentThread().getName() + "]进入炮兵阵地,等待命令,准备开火。"); try { String cmd = future.get(); //接收命令 if ("fire".equals(cmd)) { System.out.println("AFTER[" + Thread.currentThread().getName() + "]接收到命令,立刻开火,干死那个死胖子。。"); } if ("cancel".equals(cmd)) { System.out.println("AFTER[" + Thread.currentThread().getName() + "]收到撤退命令,回家睡觉。。"); } } catch (Exception e) { e.printStackTrace(); } }, "炮兵-" + i).start(); } TimeUnit.SECONDS.sleep(3); //等待3秒钟 future.complete("cancel"); //给出了执行命令 } }
BEFORE[炮兵-1]进入炮兵阵地,等待命令,准备开火。
BEFORE[炮兵-0]进入炮兵阵地,等待命令,准备开火。
BEFORE[炮兵-2]进入炮兵阵地,等待命令,准备开火。
BEFORE[炮兵-3]进入炮兵阵地,等待命令,准备开火。
//sleep 3 秒
AFTER[炮兵-1]收到撤退命令,回家睡觉。。
AFTER[炮兵-0]收到撤退命令,回家睡觉。。
AFTER[炮兵-2]收到撤退命令,回家睡觉。。
AFTER[炮兵-3]收到撤退命令,回家睡觉。。

 该类的处理主要是建立在Future线程模型的基础之上的实现操作。

对于本类而言,除了以上的使用方式之外还可以采用异步的线程执行方式处理。在创建CompletableFuture类对象的时候还可以使用这个类之中提供的一种静态方法:

public static CompletableFuture<Void> runAsync(Runnable runnable)

范例:更换实现方式实现上述轰炸操作:

package com.itermis.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemoII {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("[FUTURE]将军正在温柔乡里美梦了,等着将军睡醒开炮..");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("[FUTURE]将军醒了,开始干活了..");
        });
        for (int i = 0; i < 4; i++) {
            new Thread(() -> {
                System.out.println("BEFORE[" + Thread.currentThread().getName() + "]进入炮兵阵地,等待命令,准备开火。");
                try {
                    System.out.println("AFTER[" + Thread.currentThread().getName() + "]接收到命令,立刻开火,干死那个死胖子。。" + future.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "炮兵-" + i).start();
        }
    }
}
[FUTURE]将军正在温柔乡里美梦了,等着将军睡醒开炮..
BEFORE[炮兵-1]进入炮兵阵地,等待命令,准备开火。
BEFORE[炮兵-0]进入炮兵阵地,等待命令,准备开火。
BEFORE[炮兵-2]进入炮兵阵地,等待命令,准备开火。
BEFORE[炮兵-3]进入炮兵阵地,等待命令,准备开火。
// sleep 3秒
[FUTURE]将军醒了,开始干活了..
AFTER[炮兵-2]接收到命令,立刻开火,干死那个死胖子。。null
AFTER[炮兵-0]接收到命令,立刻开火,干死那个死胖子。。null
AFTER[炮兵-3]接收到命令,立刻开火,干死那个死胖子。。null
AFTER[炮兵-1]接收到命令,立刻开火,干死那个死胖子。。null

CompletableFuture这个类最大的好处是提供有所有等待线程的执行触发点。