欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

荐 Java多线程-FutureTask

程序员文章站 2022-07-02 23:03:29
简介本篇 是Java 多线程的 开篇,这篇 主要简单的聊了下线程的几种创建方式,然后从源码的角度去分析了下FutureTask,FutureTask这个类 既和Callable有关系,又和Runnable有关联,并且是实现了Future这个接口,能够很好的管理我们的Callable或者Runnable。线程我们简单说下线程:线程是操作系统能够调度的最小单位,它是被包含的在进程中执行的,一个进程中可以包含多个线程,每个线程执行不同的任务。线程的几种状态线程的几种创建关于 这个问题 有人说 三种创...

简介

本篇 是Java 多线程的 开篇,这篇 主要简单的聊了下线程的状态,几种创建方式,然后从源码的角度去分析了下FutureTask,FutureTask这个类 既和Callable有关系,又和Runnable有关联,并且是实现了Future这个接口,能够很好的管理我们的Callable或者Runnable。

线程

我们简单说下线程:线程是操作系统能够调度的最小单位,它是被包含的在进程中执行的,一个进程中可以包含多个线程,每个线程执行不同的任务。

线程的几种状态

 public enum State {
        NEW,
        RUNNABLE,
        BLOCKED,
        WAITING,
        TIMED_WAITING,
        TERMINATED;
    }
  1. 初始状态New: 线程创建的初始状态 就是我们new 一个线程出来 还没start的时候状态
  2. 运行状态Runnable:线程调用Start方法 此时线程状态变成就绪状态(ready),这个时候就需要等待CPU去调度分配可执行的时间片,如果分配到就变成running状态,Runnable是ready和runnding的一个总称吧~ 都叫运行中。。。
  3. 阻塞状态Blocked: 这个状态是进入到Synchronized方法中 或者代码块中,此时线程需要等待去获取monitor lock对象,顺带说下monitor对象,我们每个对象在JVM 里面 都会有一个与之对应的monitor和其关联,我们都知道 Synchronized 是通过monitorEnter 和monitorexit 实现的,当JVM执行到monitorEnter的时候 会去获取当前对象关联的monitor对象,那当前的monitor就会变成锁定状态,后面的线程在执行到这边的时候,就需要等待去获取monitor对象,此时的线程状态就变成了阻塞状态
  4. 等待状态Waiting: 等线程遇到Object#wait,LockSupport#park或者Thread#join 等这些方法的时候,线程就是进入到等待状态,等待唤醒的信号!
  5. 有限等待状态Time-Waiting:这个是一个有限等待,上面我列举的方法中,都有一个有等待时间的等待的方法,执行这个方法的时候线程状态就是Time-Waiting
  6. 终止状态Terminated 线程执行完成后的状态

线程的几种创建

关于 这个问题 有人说 三种创建方式 有人说是2种。
看了oracle的官网 其实说了 也就是2种创建方法:https://docs.oracle.com/en/java/javase/13/docs/api/java.base/java/lang/Thread.html

直接继承Thread接口

这个是第一种方式 就是直接继承Thread接口

class MyThread extends Thread {

    public MyThread() {
        this.setName("MyThread");
    }

    @Override
    public void run() {
        System.out.printf("【%s】 is runing", Thread.currentThread().getName());
        super.run();
    }

    public static void main(String[] args) {
        MyThread myThread=new MyThread();
        myThread.start();
    }

}

Runnable

   <!--Runnable.java-->
   public interface Runnable {
    public abstract void run();
   }

   public static void main(String[] args) {

       Runnable runnable=new Runnable() {
           @Override
           public void run() {
               System.out.printf("【%s】 is runing", Thread.currentThread().getName());
           }
       };

       Thread runnableThread=new Thread(runnable);
        runnableThread.start();
    }

Callable

其实这中方式 和 上面的那种 是本质是一样的,为什么会这么说呢?

Callable 是要配合Future接口来使用的,正常我们使用的方式是这样的

      <!--Callable.java-->
      public interface Callable<V> {
          V call() throws Exception;
      }

       Callable<String> callable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                return Thread.currentThread().getName() + "---burgxun";
            }
        };

        FutureTask<String> futureTask = new FutureTask(callable);
        Thread callableThread = new Thread(futureTask);
        callableThread.start();

        String msg = futureTask.get();
        System.out.println(msg);

FutureTask

FutureTask 看下这个类的继承结构

public class FutureTask<V> implements RunnableFuture<V> {}
public interface RunnableFuture<V> extends Runnable, Future<V>{}

FutureTask 本质是实现了Runnable和Future接口的 所以 我们在new Thread()的时候 能传入FutureTask对象,Future接口我们知道 是用来获取Callable 执行的结果的,所以我们可以使用FutureTask#get的方法可以获取Callable 执行后的结果

FutureTask 对象是一个是一个对任务进行管理的类,有2中任务,一个是有返回值的Callable,还有一个是无返回值的Runable,看下它的构造函数

    /** 这个就是FutureTask 要执行的的Callable */
    private Callable<V> callable;
    /** 这个就是存储运行结果的地方 用get获取的值就是从outcome这边取的*/
    private Object outcome; // non-volatile, protected by state reads/writes
    /** 运行 FutureTask 的线程 */
    private volatile Thread runner;
   
   public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
  
   public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

首先 我们看到 FutureTask 有2个构造函数 一个是callable的还有一个是runnable的, 其中我们看到 FutureTask中一个callable的变量 就是用来存储要执行的任务, 这边还有一个要注意的地方就是 Executors.callable(runnable, result) 这边 这个就是使用了Executors类,一个任务执行的工具类,我们具体来看下代码:

    <!--Executors.java-->
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

    <!--RunnableAdapter是Executors的内部类-->
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

看下 其实RunnableAdapter的实现也很简单,RunnableAdapter是一个Runnable的是适配器,用来将Runnable转化为Callable的,构造函数的入参result,我们正常都不会用到,传null 就好。

Future

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

看下Future接口定义的方法 我们能看到Get方法 还有一个待阻塞时间的get方法,还有用来判断任务状态的2个方法,还有取消任务的方法,FutureTask继承了RunnableFuture,RunnableFuture实现了Future,那FutureTask也间接的实现了Future接口。

Get方法

那我们看下 我们获取Callable运行结果值的get方法 在FutureTask中是怎么实现的

    private volatile int state;
    private static final int NEW          = 0;//
    private static final int COMPLETING   = 1;//进行中状态
    private static final int NORMAL       = 2;//任务完成状态
    private static final int EXCEPTIONAL  = 3;//任务执行异常
    private static final int CANCELLED    = 4;//任务取消
    private static final int INTERRUPTING = 5;//任务中断中
    private static final int INTERRUPTED  = 6;//任务被中断

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;//state 是任务的状态
        if (s <= COMPLETING)// 如果是小于等于COMPLETING 那state的状态值 只有 new,COMPLETING
            s = awaitDone(false, 0L);//加入到等待中 是无限等待
        return report(s);
    }
    
     private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)//NORMAL 状态说明是执行任务执行完成了  从outcome中获取值
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    <!--获取值的线程 加入到等待链表中-->
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;//等待的截止时间,timed是false 值是0
        WaitNode q = null;//等待的node 是 一个内部类
        boolean queued = false;
        for (;;) {//自旋 放waitNode节点
            <!--在等待过程中 用来检测 线程的中断状态   为了响应中断状态-->
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;//当先 任务的状态
            if (s > COMPLETING) {// 如果任务状态 已经大于1 那就是后面的状态  那就算任务执行完成 退出循环
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // 如果任务还在执行中 那就调用yield 让出当前的分配到的CPU的时间片,重新去竞争
                Thread.yield();
            else if (q == null)//到了这边 那s的状态应该是0,也就是new 状态,这边创建等待节点WaitNode
                q = new WaitNode();
            else if (!queued)
                <!--利用CAS的方式 将当前的waitNode节点放入的到waiters的后置节点中 第一次 都会执行-->
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);//如果 没设置截止时间的等待 线程就被阻塞,线程状态变成waiting ,等待任务执行完成后的唤醒
        }
    }

看完了 上面的流程 我们知道了 我们在用get方法获取值的时候,是怎么做到的,其实就是从FutureTask中的outcome字段中获取的。get方法 虽然叫get,但是当任务没有执行完成没法获取到返回值的时候 ,是会阻塞线程的!

run方法

通过Get 方法 我们知道了 结果是从outcome中来获取值得,那outcome的值又是怎么得到的呢,我们看下run方法是怎么做的

    public void run() {
        <!--判断下 当前任务的状态 如果是非new,说明任务已经被启动 就直接返回;如果CAS设置当前任务的执行线程失败,那也返回~-->
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {//这边再判断下 任务的状态 必须是初始状态New
                V result;
                boolean ran;//任务是否执行完成
                try {
                    result = c.call();//执行任务的主体逻辑
                    ran = true;//执行完成
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);//如果任务执行完成 就set 的到的Result结果值
            }
        }
    }
    
    protected void set(V v) {
        //修改当前FutureTask中任务的执行状态 执行中  这边很奇怪 是修改了状态是执行中 而不是NORMAL 执行完成状态 应该在这边outcome 还没有赋值,所以不算任务执行完成
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 刚说完 上面的  这边就来了 这变修改了任务的状态,NORMAL是任务的最终状态
            finishCompletion();//任务完成后 要执行的动作 应该就是唤醒阻塞的线程 去获取值
        }
    }
    
     /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     * 英文注释 写的很清楚  移除and唤醒所有的等待线程,执行done()方法,并且设置FutureTask的callable为null
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);// 唤醒等待的节点
                    }
                    <!--这是常规的链表 移除节点的操作  是一个单向链表-->
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc  设置null 让当前节点和链表断开,帮助GC 去回收当前节点  应该是可达性分析算法,断掉后 当前节点就是不可达,就是要回收的对象
                    q = next;// 
                }
                break;
            }
        }
        done();// 是一个空方法 可以去子类继承实现
        callable = null;        // 当前任务直接执行完成,任务主体设置为null
    }

通过上面的Run方法是注释 应该清楚了 是怎么去赋值outCome,然后又是怎么去唤醒等待的阻塞线程,最终通过get去获取到值

总结

相信这篇文章 能帮助大家理解FutureTask 是做什么的,怎么去和Runnable,Callable关联起来的,Callable又是怎么去获取到执行到结果的!

本文地址:https://blog.csdn.net/zxlp520/article/details/107287762