欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发容器和框架(Java并发编程的艺术笔记)

程序员文章站 2022-05-04 10:52:18
...

ConcurrentHashMap的实现原理与使用

线程不安全的HashMap

在多线程环境,使用HashMap进行put操作会引起死循环,原因是多线程会导致HashMap的Entry链表形成环状数据结构,进而Entry的next节点永不会空,产生死循环获取Entry。

效率低下的HashTable

HashTable容器使用synchronized来保证线程安全,但在竞争激烈的情况下,当一个线程访问HashTable的同步方法,其他线程尝试访问HashTable的同步方法时,会进入阻塞状态。例如线程1使用put操作时,线程2不能使用put方法和get方法,因此效率低。

ConcurrentHashMap的锁分段技术

HashTable容器效率低的原因是所有线程都必须争夺同一个锁。假如容器里有多把锁,将数据分称一段一段的存储,给每一段数据配一把锁,那么线程间就不会存在锁竞争。这就是ConcurrentHashMap的锁分段技术。

 

ConcurrentHashMap的结构

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁(ReentrantLock),它扮演锁的角色;HashEntry则用于存储键值对数据。

一个ConcurrentHashMap里包含一个Segment数组。Segment是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素。当对HashEntry数组的数据进行修改时, 必须先获得与它对应的Segment锁,

并发容器和框架(Java并发编程的艺术笔记)

 

ConcurrentHashMap相关参数的初始化

1.初始化segments数组

以下是初始化segments数组源代码,segments数组长度ssize是通过concurrencyLevel计算得出的为了能 通过按位与的散列算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方,因此需要计算一个大于或等于concurrencyLevel的最小的2的N次方值 来作为segments数组的长度。

if (concurrencyLevel > MAX_SEGMENTS)
   concurrencyLevel = MAX_SEGMENTS;
    int sshift = 0;
    int ssize = 1;
        while (ssize < concurrencyLevel) {
              ++sshift;
            ssize <<= 1;
        }
    segmentShift = 32 - sshift;
    segmentMask = ssize - 1;
    this.segments = Segment.newArray(ssize);

 

2.初始化段偏移量segmentShift和段掩码segmentMask

这两个变量需要在定位segment时的散列算法里使用,sshift等于segments数组长度ssize从1向左移位的次数,比如默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。

segmentShift用于定位参与散列运算的位数,segmentShift等于32减sshift,所以等于28,此处用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的。

segmentMask是散列运算的掩码,等于ssize减1,即15。

3.初始化每个segment

输入参数initialCapacity是ConcurrentHashMap的初始化容量,,loadfactor是每个segment的负载因子,在在构造方法里需要通过这两个参数来初始化数组中的每个segment。

下面代码中的变量cap为segment数组的HashEntry数组长度

if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = 1;
        while (cap < c)
            cap <<= 1;
        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);

 

定位Segment

在插入和获取元素时,必须先通过散列算法定位到Segment。如下所示,ConcurrentHashMap会首先使用 Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列。再散列的原因是减少散列冲突,使元素能均匀分布在不同的Segment中。

    private static int hash(int h) {
        h += (h << 15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h << 3);
        h ^= (h >>> 6);
        h += (h << 2) + (h << 14);
        return h ^ (h >>> 16);
    }

ConcurrentHashMap通过以下散列算法定位segment,默认情况下下segmentShift为28,segmentMask为15,再散列后的数最大是32位二进制数据。向右无符号移动28位(hash>>>segmentShift),意思是让高4位参与到散列运算中。

final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

 

ConcurrentHashMap的操作

get操作

如下是Segment的get操作。先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素

public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}

可以看到,get方法是不加锁的,因为它将要使用的共享变量都定义成volatile类型,如用于统计当前 Segement大小的count字段和用于存储值的HashEntry的value。它保证能被多线程同时读,但不会读到过期值,但只能被单线程写(如果写入的值不依赖于原值,则可以多线程写)。在get操作里,只需要读而不写共享变量count和value,因此可不用加锁。

定位HashEntry和定位Segment的散列算法虽然一样, 都与数组的长度减去1再相“与”,但是相“与”的值不一样,定位Segment使用的是元素的 hashcode通过再散列后得到的值的高位,而定位HashEntry直接使用的是再散列后的值。其目的 是避免两次散列后的值一样。

hash >>> segmentShift) & segmentMask // 定位Segment所使用的hash算法
int index = hash & (tab.length - 1); // 定位HashEntry所使用的hash算法

 

put操作

put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作首先要判断是否要对Segment里的的HashEntry数组进行扩容;其次是定位添加元素的位 置,然后将其放在HashEntry数组里。

(1)是否需要扩容

在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过,则对数组进行扩容。

(2)如何扩容

创建一个容量是原来容量两倍的数组,然后将原数组里的元素进 行再散列后插入到新的数组里。,ConcurrentHashMap只对某个segment进行扩容。

 

size操作

统计元素的大小即统计所有Segment里元素的大小count后求和。但考虑到累加前使用的count可能发生变化(几率非常小),所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。

ConcurrentHashMap通过modCount变量来判断统计时容器是否发生变化:在put、remove和clean方法里操作元素前都会将变量modCount进行加1,在统计size前后比较modCount是否发生变化即可得知。

 


ConcurrentLinkedQueue

并发编程有时需要使用线程安全的队列,实现队列的方式有两种:

  • 阻塞算法:该队列可以用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。
  • 非阻塞算法:循环CAS

ConcurrentLinkedQueue则属于非阻塞方式,它是一个采用FIFO的*线程安全队列。

ConcurrentLinkedQueue的结构

队列由head节点和tail节点,每个节点由节点元素和指向下一元素的引用组成。默认情况下head存储元素为空,tail节点等于head节点

private transient volatile Node<E> tail = head;

入队列

入队列过程

该过程分为两件事:

  1. 将入队节点设置为当前队列尾节点的下一节点;
  2. 更新tail节点,若tail节点的next节点不为空,则将入队节点设置tail节点,否则将入队节点设置为tail的next节点。

如果是多线程同时进行入队操作,则可能出现其他线程插队的情况:例如一线程先获取尾节点,正当它向设置尾节点时,另一个线程插队,则队列的尾节点就会发生变化。这是当前线程会需要暂停入队操作,重新获取尾节点。

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
// 入队前,创建一个入队节点
        Node<E> n = new Node<E>(e);
        retry:
// 死循环,入队不成功反复入队。
        for (;;) {
// 创建一个指向tail节点的引用
            Node<E> t = tail;
// p用来表示队列的尾节点,默认情况下等于tail节点。
            Node<E> p = t;
            for (int hops = 0; ; hops++) {
// 获得p节点的下一个节点。
                Node<E> next = succ(p);
// next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点
                if (next != null) {
// 循环了两次及其以上,并且当前节点还是不等于尾节点
                    if (hops > HOPS && t != tail)
                        continue retry;
                    p = next;
                }
// 如果p是尾节点,则设置p节点的next节点为入队节点。
                else if (p.casNext(null, n)) {
/*如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点,
更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点*/
                    if (hops >= HOPS)
                        casTail(t, n); // 更新tail节点,允许失败
                    return true;
                }
// p有next节点,表示p的next节点是尾节点,则重新设置p节点
                else {
                    p = succ(p);
                }
            }
        }
    }

 

定位尾节点

每次入队都必须通过tail节点来找到尾节点,尾节点可能是tail节点,也可是tail节点的next节点。

获取某节点p的next节点代码如下,若p节点等于p的next节点,则只能是一种情况:p节点和p的next节点都为空,表示该队列刚初始化,正准备添加,因此需要返回head节点。

final Node<E> succ(Node<E> p) {
    Node<E> next = p.getNext();
    return (p == next) head : next;
}

 

设置入队节点为尾节点

p.casNext(null,n)方法(CAS算法)用于将入队节点设置为当前队列尾节点的next节点,如果p是null, 表示p是当前队列的尾节点,如果不为null,表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。

 

出队列

该操作是从队列里返回一个节点元素,并清空该节点对元素的引用。

下图是出队列的例子,由图可知,并不是每次出队时都更新head节点,当head节点里由元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。

并发容器和框架(Java并发编程的艺术笔记)

如下是出队代码。首先是获取头节点元素,判断是否为空。若为空,则表示另外一个线程已进行一次出队操作,将该节点元素拿走;若不为空,则使用CAS方式将头节点的引用设置为nul,若CAS成功,则直接返回头节点的元素;若不成功,表示另外一个线程已经 进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

    public E poll() {
        Node<E> h = head;
// p表示头节点,需要出队的节点
        Node<E> p = h;
        for (int hops = 0;; hops++) {
// 获取p节点的元素
            E item = p.getItem();
// 如果p节点的元素不为空,使用CAS设置p节点引用的元素为null,
// 如果成功则返回p节点的元素。
            if (item != null && p.casItem(item, null)) {
                if (hops >= HOPS) {
// 将p节点下一个节点设置成head节点
                    Node<E> q = p.getNext();
                    updateHead(h, (q != null) q : p);
                }
                return item;
            }
// 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外
// 一个线程修改了。那么获取p节点的下一个节点
            Node<E> next = succ(p);
// 如果p的下一个节点也为空,说明这个队列已经空了
            if (next == null) {
// 更新头节点。
                updateHead(h, p);
                break;
            }
// 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
            p = next;
        }
        return null;
    }

 


阻塞队列

(1)当队列满时,队列会阻塞插入元素的线程,直到队列不满。

(2)在队列为空时,获取元素的线程会等待队列变为非空。

当阻塞队列不可用时,插入和移除操作提供了4种处理方式。

并发容器和框架(Java并发编程的艺术笔记)

Java提供了7个阻塞队列:

ArrayBlockingQueue

它一个由数组结构组成的有界阻塞队列,按FIFO的原则对元素排序。

默认情况下不保证线程公平的访问队列,即不保证先阻塞线程先访问队列。如果希望保证公平(这会降低吞吐量),我们可以如下做:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

其公平性是使用可重入锁实现的

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
        this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

 

LinkedBlockingQueue

一个用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

 

PriorityBlockingQueue

是一个支持优先级的*阻塞队列。默认情况下元素采取自然顺序 升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证 同优先级元素的顺序。

 

DelayQueue

一个支持延时获取元素的*阻塞队列,在创建元素时可以指定多久才能从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。队列使用PriorityQueue来实现。队中的元素必须实现Delayed接口

(1)如何实现Delayed接口

参考ScheduledThreadPoolExecutor 里ScheduledFutureTask类的实现。

第一步:在对象创建时,用time记录当前对象延迟到什么时候可以使用,使用sequenceNumber来标识元素在队列中的先后顺序。

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}

第二步:实现getDeley方法,该方法返回当前元素还需延时多长时间,单位为纳秒。

public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}

第三步:实现compareTo方法来指定元素的顺序。例如,让延时时间最长的放在队列的末尾。

 public int compareTo(Delayed other) {
        if (other == this) // compare zero ONLY if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<> x = (ScheduledFutureTask<>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) -
                other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) 0 : ((d < 0) -1 : 1);
    }

(2)如何实现延时阻塞队列

当消费者从队列里获取元素时,若元素没有达到延时时间,则阻塞当前线程。

如下所示,leader变量是一个等待获取队列头部元素的线程,若leader不为空,表示已有线程在等待获取队列头元素,使用await()方法让当前线程等待;若为空,则把当前线程设置为leader,并用awaitNanos()方法让当前线程等待接收信号或等待delay时间。

    long delay = first.getDelay(TimeUnit.NANOSECONDS);
    if (delay <= 0)
        return q.poll();
    else if (leader != null)
        available.await();
    else {
        Thread thisThread = Thread.currentThread();
        leader = thisThread;
        try {
            available.awaitNanos(delay);
        } finally {
        if (leader == thisThread)
            leader = null;
        }
    }

 

SynchronousQueue

一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作, 否则不能继续添加元素。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于 LinkedBlockingQueue和ArrayBlockingQueue。

它支持公平访问队列。默认情况下线程非公平性访问队列。

 

LinkedTransferQueue

一个由链表结构组成的*阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

(1)transfer方法

若当前有消费者正等待接受数据,该方法可以把生产者传入的元素立即传输给消费者;若没有等待的消费者,则将元素放在队列的tail节点,并等到该元素被消费者消费了才返回。

该方法关键部分如下,第一行试图把存放当前元素s作为tail节点;第二行让CPU自旋等待消费者消费元素。自旋一定的次数后使用Thread.yield()方法来暂停 当前正在执行的线程,并执行其他线程。

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

(2)tryTransfer方法

该方法用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等 待接收元素,则返回false。

 

LinkedBlockingDeque

一个由链表组成的双向阻塞队列,所谓双向队列指的是可以 从队列的两端插入和移出元素。由于多了一个操作队列的入口,在多线程同时入队 时,也就减少了一半的竞争。

相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、 addLast、offerFirst、offerLast、peekFirst和peekLast等方法。

 

阻塞队列的实现原理

(1)通知模式

 当生产者往往满的队列里添加元素时会阻塞住生 产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。

下面是ArrayBlockingQueue的源码,它使用了Condition来实现。

    private final Condition notFull;
    private final Condition notEmpty;

 public ArrayBlockingQueue(int capacity, boolean fair) {
// 省略其他代码
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

await()方法主要通过LockSupport.park(this)实现,而继续进入park()方法,会发现调用setBlocker先保存一下将要阻塞的线程,然后调用unsafe.park阻塞 当前线程。

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    unsafe.park(false, 0L);
    setBlocker(t, null);
}

unsafe.park是个native方法(由非java语言实现),这个方法会阻塞当前线程,只有如下4种情况的一种发生,该方法才会返回:

  • 与park对应的unpark执行或已经执行时。“已经执行”是指unpark先执行,然后再执行park 的情况。
  • ·线程被中断时。
  • ·等待完time参数指定的毫秒数时。
  • ·异常现象发生时,这个异常现象没有任何原因。

 


Fork/Join框架 

 Fork/Join是一个用于并行执行任务的框架,是一个把大任务分割成若干 个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

工作窃取算法 

假设我们需要做一个比较大的任务,可以将该任务分割为若干互不依赖的子任务,为了减少线程间竞争,将子任务放到不同队列里,并为每个队列创建一个单独的线程来执行。但是,有的线程会先把队列里的任务做完,而其他线程对应的队列里还有任务等待处理。因此,干完活的线程就会去其他线程的队列里窃取一个任务来执行。为了减少窃取任务线程和被窃取线程之间的竞争,通常使用双端队列,被窃取的线程永远从双端队列头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

缺点:在某些情况仍存在竞争,比如双端队列里只有一个任务。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。

 

Fork/Join框架的设计

Fork/Join使用两个类来完成分割任务和执行任务并合并结果两件事情:

ForkJoinTask:首先要创建一个ForkJoin任务,它提供任务中执行fork()和join()操作。通常我们需要继承它的子类:

  • RecursiveAction:用于没有返回结果的任务。
  • RecursiveTask:用于有返回结果的任务。

ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割的子任务会添加到当前工作线程维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任 务。

 

使用Fork/Join框架

通过框架来计算1到4的累加和

public class Test {
    public static class CountTask extends RecursiveTask<Integer> {
        private static final int MAX_Num = 2;
        private int start;
        private int end;

        public CountTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            int sum = 0;
            boolean canCompute = (end - start) <= MAX_Num;
            if(canCompute) {
                for(int i = start; i <= end; i++) {
                    sum += i;
                }
            }else {                 //任务分割
                int mid = (start + end) / 2;
                CountTask leftTask = new CountTask(start, mid);
                CountTask rightTask = new CountTask(mid + 1, end);
//                执行子任务
                leftTask.fork();
                rightTask.fork();
                int leftRes = leftTask.join();
                int rightRes = rightTask.join();
                sum = leftRes + rightRes;
            }
            return sum;
        }
    }

    public static void main(String[] args) throws Exception{
        ForkJoinPool pool = new ForkJoinPool();
//        生成计算任务,计算1到4的累加和
        CountTask task = new CountTask(1, 4);
        Future<Integer> result = pool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
        } catch (ExecutionException e) {
        }

    }
}

 

Fork/Join框架的实现原理

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

(1)ForkJoinTask的fork方法实现原理

当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法 异步地执行这个任务,然后立即返回结果。代码如下。

public final ForkJoinTask<V> fork() {
    ((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this);
    return this;
}

pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的 signalWork()方法唤醒或创建一个工作线程来执行任务。代码如下。

final void pushTask(ForkJoinTask<> t) {
    ForkJoinTask<>[] q;
     int s, m;
    if ((q = queue) != null) { // ignore if queue removed
        long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
        UNSAFE.putOrderedObject(q, u, t);
        queueTop = s + 1; // or use putOrderedInt
        if ((s -= queueBase) <= 2)
            pool.signalWork();
        else if (s == m)
            growQueue();
    }
}

(2)ForkJoinTask的join方法实现原理

Join方法的主要作用是阻塞当前线程并等待获取结果。

    public final V join() {
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
    }
    private V reportResult() {
        int s; Throwable ex;
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            UNSAFE.throwException(ex);
        return getRawResult();
    }

它首先调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结 果,任务状态有4种:

已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常 (EXCEPTIONAL)。

再看下doJoin()方法。该方法首先查看任务状态,若执行完成则直接返回任务状态;若没有,则从任务数组里取出任务并执行。。如果任务顺利执行 完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

    private int doJoin() {
        Thread t; 
        ForkJoinWorkerThread w; 
        int s; 
        boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
                return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }

 

Fork/Join框架的异常处理

ForkJoinTask在执行过程可能会抛出异常,ForkJoinTask提供了了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被 取消了,并且可以通过ForkJoinTask的getException方法获取异常。

if(task.isCompletedAbnormally()) {
    System.out.println(task.getException());
}

getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如 果任务没有完成或者没有抛出异常则返回null