Java中的Future类使用
程序员文章站
2022-07-10 19:13:51
Future概述Future是一个接口,表明处理异步计算的未来结果。源码以线程池为例public class ThreadPoolExecutor extends AbstractExecutorService { ...}public abstract class AbstractExecutorService implements ExecutorService { ... // submit方法返回异步计算结果 // 直接用Callable对象做...
Future
概述
Future是一个接口,表明处理异步计算的未来结果。
源码
以线程池为例
public class ThreadPoolExecutor extends AbstractExecutorService {
...
}
public abstract class AbstractExecutorService implements ExecutorService {
...
// submit方法返回异步计算结果
// 直接用Callable对象做参数
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// RunnableFuture 继承Runnable和Future并重写了run
// newTaskFor方法创建FutureTask对象,内部会调用FutureTask类的构造方法new FutureTask<T>(callable);
RunnableFuture<T> ftask = newTaskFor(task);
// AbstractExecutorService是抽象类,ExecutorService的execute方法没有实现,最后调用的是线程池的execute方法
execute(ftask);
return ftask;
}
// 用Runnable和result包装成Callable实现类
public Future<?> submit(Runnable task,T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
}
FutureTask类
public class FutureTask<V> implements RunnableFuture<V> {
...
// 刚创建时是NEW
// COMPLETING是通过CAS修改成,表示正在修改值(多线程下Futrue对象被多个线程持有的情况)
// NORMAL表示修改完成,可以获取返回值
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 异步结果
private Object outcome;
private Callable<V> callable;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
// 返回的是Executors类中有个静态内部类叫RunnableAdapter(适配器类,实现了Callable)
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
// 重写RunnableFuture中重写Runnable的run()
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// 给outcome赋值
set(result);
}
...
}
}
// 方法get()将堵塞执行,直到任务完成调用report()返会outcome。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
// 任务完成后将返回结果赋予outcome
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
}
栗子
public class FutureDemo{
private static ExecutorService e= Executors.newFixedThreadPool(10);
public static void main(String[] args) {
Future<Integer> future = e.submit(() -> {
Thread.sleep(1000);
return 10;
});
while (!future.isDone()) System.out.println("wait");
try {
System.out.println(future.get());
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
思考
Future大多数是在线程池进行异步处理,如果需要对Future对象进行处理,需要另一个线程(消息队列)来轮询处理。guava提供了ListenableFuture来进行异步增强。(貌似jdk8的CompletableFuture就是从此改进而来)
public class FutureDemo{
// 创建一个ListenableFutureTask对象
ListenableFutureTask<String> task = ListenableFutureTask.create(new Callable<String>() {
@Override
public String call() throws Exception {
return "fail";
}
});
task.addListener(new Runnable() {
@Override
public void run() {
try {
System.out.println("result is " + listenableFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}, MoreExecutors.directExecutor());
}
public final class MoreExecutors {
// DirectExecutor是个枚举类,以此保证安全单例
public static Executor directExecutor() {
return MoreExecutors.DirectExecutor.INSTANCE;
}
}
本文地址:https://blog.csdn.net/qq_33216694/article/details/107492669
上一篇: CentOS 6 安装QQ
下一篇: 正则表达式之分割功能和练习1