您现在的位置是: 首页  >  IT编程


程序员文章站 2024-03-13 21:08:04


 import java.util.concurrent.callable;
import java.util.concurrent.executionexception;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.future;

public class addtask implements callable<integer> {

 private int a,b;
 public addtask(int a, int b) {
 this.a = a;
 this.b = b;
 public integer call throws exception {
 integer result = a + b;
 return result;
 public static void main(string[] args) throws interruptedexception, executionexception {
 executorservice executor = executors.newsinglethreadexecutor;
 future<integer> future = executor.submit(new addtask(1, 2));
 integer result = future.get;// 只有当future的状态是已完成时(future.isdone = true),get方法才会返回

虽然可以实现获取异步执行结果的需求,但是我们发现这个future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isdone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.future 的接口方法:

 public interface future<v> {
  boolean cancel(boolean mayinterruptifrunning);
  boolean iscancelled;
  boolean isdone;
  v get throws interruptedexception, executionexception;
  v get(long timeout, timeunit unit)
    throws interruptedexception, executionexception, timeoutexception;


 package future;

import java.util.concurrent.cancellationexception;
import java.util.concurrent.future;
import java.util.concurrent.timeunit;

 * the result of an asynchronous operation.
 * @author lixiaohui
 * @param <v> 执行结果的类型参数
public interface ifuture<v> extends future<v> { 
 boolean issuccess; // 是否成功 
 v getnow; //立即返回结果(不管future是否处于完成状态)
 throwable cause; //若执行失败时的原因
    boolean iscancellable; //是否可以取消
 ifuture<v> await throws interruptedexception; //等待future的完成
 boolean await(long timeoutmillis) throws interruptedexception; // 超时等待future的完成
 boolean await(long timeout, timeunit timeunit) throws interruptedexception;
    ifuture<v> awaituninterruptibly; //等待future的完成,不响应中断
    boolean awaituninterruptibly(long timeoutmillis);//超时等待future的完成,不响应中断
 boolean awaituninterruptibly(long timeout, timeunit timeunit);
 ifuture<v> addlistener(ifuturelistener<v> l); //当future完成时,会通知这些加进来的监听器
 ifuture<v> removelistener(ifuturelistener<v> l);


 public class object {
   * causes the current thread to wait until another thread invokes the
   * {@link java.lang.object#notify} method or the
   * {@link java.lang.object#notifyall} method for this object.
   * in other words, this method behaves exactly as if it simply
   * performs the call {@code wait(0)}.
   * 调用该方法后,当前线程会释放对象监视器锁,并让出cpu使用权。直到别的线程调用notify/notifyall
  public final void wait throws interruptedexception {

   * wakes up all threads that are waiting on this object's monitor. a
   * thread waits on an object's monitor by calling one of the
   * {@code wait} methods.
   * <p>
   * the awakened threads will not be able to proceed until the current
   * thread relinquishes the lock on this object. the awakened threads
   * will compete in the usual manner with any other threads that might
   * be actively competing to synchronize on this object; for example,
   * the awakened threads enjoy no reliable privilege or disadvantage in
   * being the next thread to lock this object.
  public final native void notifyall;

知道这个后,我们要自己实现future也就有了思路,当线程调用了ifuture.await等一系列的方法时,如果future还未完成,那么就调用future.wait 方法使线程进入waiting状态。而当别的线程设置future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyall方法来唤醒之前因为调用过wait方法而处于waiting状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的future机制的。有兴趣的可以去看看netty的源码):

 package future;

import java.util.collection;
import java.util.concurrent.cancellationexception;
import java.util.concurrent.copyonwritearraylist;
import java.util.concurrent.executionexception;
import java.util.concurrent.timeunit;
import java.util.concurrent.timeoutexception;

 * <pre>
 * 正常结束时, 若执行的结果不为null, 则result为执行结果; 若执行结果为null, 则result = {@link abstractfuture#success_signal}
 * 异常结束时, result为 {@link causeholder} 的实例;若是被取消而导致的异常结束, 则result为 {@link cancellationexception} 的实例, 否则为其它异常的实例
 * 以下情况会使异步操作由未完成状态转至已完成状态, 也就是在以下情况发生时调用notifyall方法:
 * <ul>
 * <li>异步操作被取消时(cancel方法)</li>
 * <li>异步操作正常结束时(setsuccess方法)</li>
 * <li>异步操作异常结束时(setfailure方法)</li>
 * </ul>
 * </pre>
 * @author lixiaohui
 * @param <v>
 * 异步执行结果的类型
public class abstractfuture<v> implements ifuture<v> {

 protected volatile object result; // 需要保证其可见性
     * 监听器集
 protected collection<ifuturelistener<v>> listeners = new copyonwritearraylist<ifuturelistener<v>>;

 * 当任务正常执行结果为null时, 即客户端调用{@link abstractfuture#setsuccess(null)}时, 
 * result引用该对象
 private static final successsignal success_signal = new successsignal;

 public boolean cancel(boolean mayinterruptifrunning) {
 if (isdone) { // 已完成了不能取消
  return false;

 synchronized (this) {
  if (isdone) { // double check
  return false;
  result = new causeholder(new cancellationexception);
  notifyall; // isdone = true, 通知等待在该对象的wait的线程
 notifylisteners; // 通知监听器该异步操作已完成
 return true;
 public boolean iscancellable {
 return result == null;
 public boolean iscancelled {
 return result != null && result instanceof causeholder && ((causeholder) result).cause instanceof cancellationexception;

 public boolean isdone {
 return result != null;

 public v get throws interruptedexception, executionexception {
 await; // 等待执行结果

 throwable cause = cause;
 if (cause == null) { // 没有发生异常,异步操作正常结束
  return getnow;
 if (cause instanceof cancellationexception) { // 异步操作被取消了
  throw (cancellationexception) cause;
 throw new executionexception(cause); // 其他异常

 public v get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception {
 if (await(timeout, unit)) {// 超时等待执行结果
  throwable cause = cause;
  if (cause == null) {// 没有发生异常,异步操作正常结束
  return getnow;
  if (cause instanceof cancellationexception) {// 异步操作被取消了
  throw (cancellationexception) cause;
  throw new executionexception(cause);// 其他异常
 // 时间到了异步操作还没有结束, 抛出超时异常
 throw new timeoutexception;

 public boolean issuccess {
 return result == null ? false : !(result instanceof causeholder);

 public v getnow {
 return (v) (result == success_signal ? null : result);

 public throwable cause {
 if (result != null && result instanceof causeholder) {
  return ((causeholder) result).cause;
 return null;

 public ifuture<v> addlistener(ifuturelistener<v> listener) {
 if (listener == null) {
  throw new nullpointerexception("listener");
 if (isdone) { // 若已完成直接通知该监听器
  return this;
 synchronized (this) {
  if (!isdone) {
  return this;
 return this;

 public ifuture<v> removelistener(ifuturelistener<v> listener) {
 if (listener == null) {
  throw new nullpointerexception("listener");

 if (!isdone) {

 return this;

 public ifuture<v> await throws interruptedexception {
 return await0(true);

 private ifuture<v> await0(boolean interruptable) throws interruptedexception {
 if (!isdone) { // 若已完成就直接返回了
  // 若允许终端且被中断了则抛出中断异常
  if (interruptable && thread.interrupted) {
  throw new interruptedexception("thread " + thread.currentthread.getname + " has been interrupted.");

  boolean interrupted = false;
  synchronized (this) {
  while (!isdone) {
   try {
   wait; // 释放锁进入waiting状态,等待其它线程调用本对象的notify/notifyall方法
   } catch (interruptedexception e) {
   if (interruptable) {
    throw e;
   } else {
    interrupted = true;
  if (interrupted) {
  // 为什么这里要设中断标志位?因为从wait方法返回后, 中断标志是被clear了的, 
  // 这里重新设置以便让其它代码知道这里被中断了。
 return this;
 public boolean await(long timeoutmillis) throws interruptedexception {
 return await0(timeunit.milliseconds.tonanos(timeoutmillis), true);
 public boolean await(long timeout, timeunit unit) throws interruptedexception {
 return await0(unit.tonanos(timeout), true);

 private boolean await0(long timeoutnanos, boolean interruptable) throws interruptedexception {
 if (isdone) {
  return true;

 if (timeoutnanos <= 0) {
  return isdone;

 if (interruptable && thread.interrupted) {
  throw new interruptedexception(tostring);

 long starttime = timeoutnanos <= 0 ? 0 : system.nanotime;
 long waittime = timeoutnanos;
 boolean interrupted = false;

 try {
  synchronized (this) {
  if (isdone) {
   return true;

  if (waittime <= 0) {
   return isdone;

  for (;;) {
   try {
   wait(waittime / 1000000, (int) (waittime % 1000000));
   } catch (interruptedexception e) {
   if (interruptable) {
    throw e;
   } else {
    interrupted = true;

   if (isdone) {
   return true;
   } else {
   waittime = timeoutnanos - (system.nanotime - starttime);
   if (waittime <= 0) {
    return isdone;
 } finally {
  if (interrupted) {

 public ifuture<v> awaituninterruptibly {
 try {
  return await0(false);
 } catch (interruptedexception e) { // 这里若抛异常了就无法处理了
  throw new java.lang.internalerror;
 public boolean awaituninterruptibly(long timeoutmillis) {
 try {
  return await0(timeunit.milliseconds.tonanos(timeoutmillis), false);
 } catch (interruptedexception e) {
  throw new java.lang.internalerror;

 public boolean awaituninterruptibly(long timeout, timeunit unit) {
 try {
  return await0(unit.tonanos(timeout), false);
 } catch (interruptedexception e) {
  throw new java.lang.internalerror;

 protected ifuture<v> setfailure(throwable cause) {
 if (setfailure0(cause)) {
  return this;
 throw new illegalstateexception("complete already: " + this);

 private boolean setfailure0(throwable cause) {
 if (isdone) {
  return false;

 synchronized (this) {
  if (isdone) {
  return false;
  result = new causeholder(cause);

 return true;

 protected ifuture<v> setsuccess(object result) {
 if (setsuccess0(result)) { // 设置成功后通知监听器
  return this;
 throw new illegalstateexception("complete already: " + this);

 private boolean setsuccess0(object result) {
 if (isdone) {
  return false;

 synchronized (this) {
  if (isdone) {
  return false;
  if (result == null) { // 异步操作正常执行完毕的结果是null
  this.result = success_signal;
  } else {
  this.result = result;
 return true;

 private void notifylisteners {
 for (ifuturelistener<v> l : listeners) {

 private void notifylistener(ifuturelistener<v> l) {
 try {
 } catch (exception e) {

 private static class successsignal {


 private static final class causeholder {
 final throwable cause;

 causeholder(throwable cause) {
  this.cause = cause;


 package future.test;

import future.ifuture;
import future.ifuturelistener;

 * 延时加法
 * @author lixiaohui
public class delayadder {
 public static void main(string[] args) {
 new delayadder.add(3 * 1000, 1, 2).addlistener(new ifuturelistener<integer> {
  public void operationcompleted(ifuture<integer> future) throws exception {
 * 延迟加
 * @param delay 延时时长 milliseconds
 * @param a 加数
 * @param b 加数
 * @return 异步结果
 public delayadditionfuture add(long delay, int a, int b) {
 delayadditionfuture future = new delayadditionfuture; 
 new thread(new delayadditiontask(delay, a, b, future)).start;
 return future;
 private class delayadditiontask implements runnable {

 private long delay;
 private int a, b;
 private delayadditionfuture future;
 public delayadditiontask(long delay, int a, int b, delayadditionfuture future) {
  this.delay = delay;
  this.a = a;
  this.b = b;
  this.future = future;

 public void run {
  try {
  integer i = a + b;
  // todo 这里设置future为完成状态(正常执行完毕)
  } catch (interruptedexception e) {
  // todo 这里设置future为完成状态(异常执行完毕)
} package future.test;

import future.abstractfuture;
import future.ifuture;
public class delayadditionfuture extends abstractfuture<integer> {
 public ifuture<integer> setsuccess(object result) {
 return super.setsuccess(result);
 public ifuture<integer> setfailure(throwable cause) {
 return super.setfailure(cause);

