欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java中的Future类使用

程序员文章站 2022-07-10 19:13:51
Future概述Future是一个接口,表明处理异步计算的未来结果。源码以线程池为例public class ThreadPoolExecutor extends AbstractExecutorService { ...}public abstract class AbstractExecutorService implements ExecutorService { ... // submit方法返回异步计算结果 // 直接用Callable对象做...

Future

概述

Future是一个接口,表明处理异步计算的未来结果。


源码

以线程池为例

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
}


public abstract class AbstractExecutorService implements ExecutorService {
    ...
    //  submit方法返回异步计算结果
    //  直接用Callable对象做参数
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        //  RunnableFuture 继承Runnable和Future并重写了run
        //  newTaskFor方法创建FutureTask对象,内部会调用FutureTask类的构造方法new FutureTask<T>(callable);
        RunnableFuture<T> ftask = newTaskFor(task);
        //  AbstractExecutorService是抽象类,ExecutorService的execute方法没有实现,最后调用的是线程池的execute方法
        execute(ftask);
        return ftask;
    }
        
    //  用Runnable和result包装成Callable实现类
    public Future<?> submit(Runnable task,T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, result); 
        execute(ftask);
        return ftask;
    }
}

FutureTask类

public class FutureTask<V> implements RunnableFuture<V> {
    ...
    //  刚创建时是NEW
    //  COMPLETING是通过CAS修改成,表示正在修改值(多线程下Futrue对象被多个线程持有的情况)
    //  NORMAL表示修改完成,可以获取返回值
    private volatile int state;    
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    //  异步结果    
    private Object outcome;
    private Callable<V> callable;
    
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       
    }
    
    public FutureTask(Runnable runnable, V result) {
        //  返回的是Executors类中有个静态内部类叫RunnableAdapter(适配器类,实现了Callable)
        this.callable = Executors.callable(runnable, result);
       	this.state = NEW;  
    }
    
    //  重写RunnableFuture中重写Runnable的run()
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //  给outcome赋值
                    set(result);
            }
            ...
        } 
    }
    
    //  方法get()将堵塞执行,直到任务完成调用report()返会outcome。
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    //  任务完成后将返回结果赋予outcome
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
}

栗子

public class FutureDemo{
    private static ExecutorService e= Executors.newFixedThreadPool(10);
    public static void main(String[] args) {
        Future<Integer> future = e.submit(() -> {
            Thread.sleep(1000);
            return 10;
        });
        while (!future.isDone())  System.out.println("wait");
        try {
            System.out.println(future.get());
        } catch (Exception e1) {
            e1.printStackTrace();
        }
    }
}

思考

Future大多数是在线程池进行异步处理,如果需要对Future对象进行处理,需要另一个线程(消息队列)来轮询处理。guava提供了ListenableFuture来进行异步增强。(貌似jdk8的CompletableFuture就是从此改进而来)

public class FutureDemo{
    //  创建一个ListenableFutureTask对象
    ListenableFutureTask<String> task = ListenableFutureTask.create(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "fail";
        }
    });

    task.addListener(new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("result is " + listenableFuture.get());
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }, MoreExecutors.directExecutor());
}

public final class MoreExecutors {
    //  DirectExecutor是个枚举类,以此保证安全单例
    public static Executor directExecutor() {
        return MoreExecutors.DirectExecutor.INSTANCE;
    }
}

本文地址:https://blog.csdn.net/qq_33216694/article/details/107492669