欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

阻塞队列 — SynchronousQueue源码分析

程序员文章站 2022-06-09 20:11:58
点赞再看,养成习惯,公众号搜一搜【一角钱技术】关注更多原创技术文章。本文 GitHub org_hejianhui/JavaStudy 已收录,有我的系列文章。前言SynchronousQueue 一个不存储元素的阻塞队列,每一个 put 操作必须等待 take 操作,否则不能继续添加元素。支持公平锁和非公平锁2种策略来访问队列。默认是采用非公平性策略访问队列。公平性策略底层使用了类似队列的数据结构,而非公平策略底层使用了类似栈的数据结构。SynchronousQueue的吞吐量高于Linke.....

阻塞队列 — SynchronousQueue源码分析

点赞再看,养成习惯,公众号搜一搜【一角钱技术】关注更多原创技术文章。
本文 GitHub org_hejianhui/JavaStudy 已收录,有我的系列文章。

前言

阻塞队列 — SynchronousQueue源码分析

SynchronousQueue 一个不存储元素的阻塞队列,每一个 put 操作必须等待 take 操作,否则不能继续添加元素。支持公平锁和非公平锁2种策略来访问队列。默认是采用非公平性策略访问队列。公平性策略底层使用了类似队列的数据结构,而非公平策略底层使用了类似栈的数据结构。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

队列创建

SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();

应用场景

SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。

SynchronousQueue 的一个使用场景是在线程池里。Executors.newCachedThreadPool() 就使用了 SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

我们来看一个具体的例子:

package com.niuh.queue.synchronous;


import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class TestSynchronousQueue {

    public static void main(String[] args) {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        Thread t1 = new Thread(consumer);
        Thread t2 = new Thread(producer);
        t1.start();
        t2.start();
    }

}

/**
 * 模拟生产者
 */
class Producer implements Runnable {

    SynchronousQueue<Integer> queue = null;


    public Producer(SynchronousQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        int rand = new Random().nextInt(1000);
        System.out.println(String.format("模拟生产者:%d", rand));

        try {
            TimeUnit.SECONDS.sleep(3);
            queue.put(rand);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(queue.isEmpty());
    }
}

/**
 * 模拟消费者
 */
class Consumer implements Runnable {

    SynchronousQueue<Integer> queue = null;

    public Consumer(SynchronousQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("消费者已经准备好接收元素了...");
        try {
            System.out.println(String.format("消费一个元素:%d", queue.take()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("================================================");
    }
}

工作原理

由于SynchronousQueue的支持公平策略和非公平策略,所以底层有两种数据结构

  • 队列(实现公平策略),有一个头结点和尾结点,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
  • 栈(实现非公平策略),有一个头结点(为默认策略),同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

队列与栈都是通过链表来实现的。具体的数据结构如下:
阻塞队列 — SynchronousQueue源码分析

源码分析

定义

SynchronousQueue的类继承关系如下:
阻塞队列 — SynchronousQueue源码分析
其包含的方法定义如下:
阻塞队列 — SynchronousQueue源码分析

成员属性

// CPU的数量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 有超时的情况自旋多少次,当CPU数量小于2的时候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 没有超时的情况自旋多少次,是指定超时限制的请求的自旋次数的16倍
static final int maxUntimedSpins = maxTimedSpins * 16;
// 针对有超时的情况,自旋了多少次后,如果剩余时间大于1000纳秒就使用带时间的LockSupport.parkNanos()这个方法
static final long spinForTimeoutThreshold = 1000L;
// 传输器,即两个线程交换元素使用的东西
private transient volatile Transferer<E> transferer;

通过属性我们可以看到两个点:

  1. 这个阻塞队列里面是会自旋的;
  2. 它使用了一个叫做transferer的东西来交换元素。

为什么需要自旋这个操作?

  • 因为线程 挂起 唤醒站在cpu角度去看的话,是非常耗费资源的,涉及到用户态和内核态的切换…
  • 自旋的好处,自旋期间线程会一直检查自己的状态是否被匹配到,如果自旋期间被匹配到,那么直接就返回了
  • 如果自旋期间未被匹配到,自旋次数达到某个指标后,还是会将当前线程挂起的…

当一个平台只有一个CPU时,你觉得还需要自旋么?

  • 肯定不需要自旋了,因为一个cpu同一时刻只能执行一个线程,自旋没有意义了,而且你还站着cpu 其它线程没办法执行,这个栈的状态更不会改变了,当只有一个cpu时会直接选择 LockSupport.park() 挂起等待者线程。

主要内部类

  • Transferer是TransferStack栈和TransferQueue队列的公共类,定义了转移数据的公共操作,由TransferStack和TransferQueue具体实现。

阻塞队列 — SynchronousQueue源码分析

// Transferer抽象类,主要定义了一个transfer方法用来传输元素
abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}
// 以栈方式实现的Transferer
static final class TransferStack<E> extends Transferer<E> {
    // 栈中节点的几种类型:
    // 1. 消费者(请求数据的)
    static final int REQUEST    = 0;
    // 2. 生产者(提供数据的)
    static final int DATA       = 1;
    // 3. 二者正在匹配中
    static final int FULFILLING = 2;

    // 栈中的节点
    static final class SNode {
        // 下一个节点
        volatile SNode next;        // next node in stack
        // 匹配者
        volatile SNode match;       // the node matched to this
        // 等待着的线程
        volatile Thread waiter;     // to control park/unpark
        // 元素
        Object item;                // data; or null for REQUESTs
        // 模式,也就是节点的类型,是消费者,是生产者,还是正在匹配中
        int mode;
    }
    // 栈的头节点
    volatile SNode head;
}
// 以队列方式实现的Transferer
static final class TransferQueue<E> extends Transferer<E> {
    // 队列中的节点
    static final class QNode {
        // 下一个节点
        volatile QNode next;          // next node in queue
        // 存储的元素
        volatile Object item;         // CAS'ed to or from null
        // 等待着的线程
        volatile Thread waiter;       // to control park/unpark
        // 是否是数据节点
        final boolean isData;
    }

    // 队列的头节点
    transient volatile QNode head;
    // 队列的尾节点
    transient volatile QNode tail;
}
  1. 定义了一个抽象类Transferer,里面定义了一个传输元素的方法;
  2. 有两种传输元素的方法,一种是栈,一种是队列;
  3. 栈的特点是后进先出,队列的特点是先进先出;
  4. 栈只需要保存一个头节点就可以了,因为存取元素都是操作头节点;
  5. 队列需要保存一个头节点一个尾节点,因为存元素操作尾节点,取元素操作头节点;
  6. 每个节点中保存着存储的元素、等待着的线程,以及下一个节点;
  7. 栈和队列两种方式有什么不同呢?请看下面的分析。
  • WaitQueue、LifoWaitQueue、FifoWaitQueue表示为了兼容JDK1.5版本中的SynchronousQueue的序列化策略所遗留的,这里不做具体的讲解。

阻塞队列 — SynchronousQueue源码分析

构造函数

public SynchronousQueue() {
    // 默认非公平模式
    this(false);
}

public SynchronousQueue(boolean fair) {
    // 如果是公平模式就使用队列,如果是非公平模式就使用栈
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
  1. 默认使用非公平模式,也就是栈结构;
  2. 公平模式使用队列,非公平模式使用栈;

入队方法

SynchronousQueue 提供了多个入队的方法,但内部都是通过调用transferer的transfer()方法,传入元素e,说明是生产者。

  • put(E e),将指定的元素插入此队列中,无返回值
  • offer(E e),将指定的元素插入到此队列中,在成功时返回 true
  • offer(E e, long timeout, TimeUnit unit)

put(E e)

public void put(E e) throws InterruptedException {
    // 元素不可为空
    if (e == null) throw new NullPointerException();
    // 直接调用传输器的transfer()方法
    // 三个参数分别是:传输的元素,是否需要超时,超时的时间
    if (transferer.transfer(e, false, 0) == null) {
        // 如果传输失败,直接让线程中断并抛出中断异常
        Thread.interrupted();
        throw new InterruptedException();
    }
}

offer(E e)

public boolean offer(E e) {
    // 元素不可为空
	if (e == null) throw new NullPointerException();
    // 三个参数分别是:传输的元素,是否需要超时,超时的时间
	return transferer.transfer(e, true, 0) != null;
}

offer(E e, long timeout, TimeUnit unit)

public boolean offer(E e, long timeout, TimeUnit unit)
	throws InterruptedException {
    // 元素不可为空
	if (e == null) throw new NullPointerException();
    // 直接调用传输器的transfer()方法,传输成功返回true
    // 三个参数分别是:传输的元素,是否需要超时,超时的时间
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
    	return true;
    // 如果线程没有中断直接返回false
	if (!Thread.interrupted())
		return false;
    throw new InterruptedException();
}

出队方法

SynchronousQueue 提供了多个出队的方法,但内部都是通过调用transferer的transfer()方法,传入null,说明是消费者。

  • take()
  • poll()
  • poll(long timeout, TimeUnit unit)

take()

public E take() throws InterruptedException {
    // 直接调用传输器的transfer()方法
    // 三个参数分别是:null,是否需要超时,超时的时间
    // 第一个参数为null表示是消费者,要取元素
    E e = transferer.transfer(null, false, 0);
    // 如果取到了元素就返回
    if (e != null)
        return e;
    // 否则让线程中断并抛出中断异常
    Thread.interrupted();
    throw new InterruptedException();
}

poll()

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 	// 直接调用传输器的transfer()方法
	// 三个参数分别是:null,是否需要超时,超时的时间
	// 第一个参数为null表示是消费者,要取元素
 	E e = transferer.transfer(null, true, unit.toNanos(timeout));
 	// 如果取到了元素 或者 线程没有并中断 就返回
	if (e != null || !Thread.interrupted())
		return e;
	throw new InterruptedException();
}

poll(long timeout, TimeUnit unit)

public E poll() {
	// 直接调用传输器的transfer()方法
	// 三个参数分别是:null,是否需要超时,超时的时间
	// 第一个参数为null表示是消费者,要取元素
	return transferer.transfer(null, true, 0);
}

非公平的堆栈(默认策略)

栈元素

put 的时候,就往栈中放数据。take 的时候,就从栈中取数据,两者操作都是在栈顶上操作数据.

/** 栈中的节点 */
static final class SNode {
	// 下一个节点
	volatile SNode next;        // next node in stac
    // 匹配者
	volatile SNode match;       // the node matched to this
	// 等待着的线程
    volatile Thread waiter;     // to control park/unpark
  	// 元素
  	Object item;                // data; or null for REQUESTs
    // 模式,也就是节点的类型,是消费者,是生产者,还是正在匹配中
  	int mode;
}
  • volatile SNode next 栈顶的下一个节点
  • volatile SNode match undefined匹配,用来判断阻塞栈元素能被唤醒的时机 比如我们先执行 take,此时队列中没有数据,take 被阻塞了,栈元素为 SNode1 当 put 时,会把当前 put 的栈元素赋值给 SNode1 的 match 属性,并唤醒 take 当 take 被唤醒,发现 SNode1 的 match 属性有值时,就能拿到 put 的数据
  • volatile Thread waiter 阻塞的线程
  • Object item 未投递/未消费的消息

transfer

TransferStack 内部类的 transfer 方法

@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    
    // e 为空: take 方法,非空: put 方法
    int mode = (e == null) ? REQUEST : DATA;
    
    // 自旋
    for (;;) {
        // 头节点情况分类
        // 1:为空,说明队列中还没有数据
        // 2:非空,并且是 take 类型的,说明头节点线程正等着拿数据
        // 3:非空,并且是 put 类型的,说明头节点线程正等着放数据
        SNode h = head;
        
        // 栈头为空,说明队列中还没有数据。
        // 栈头非空且栈头的类型和本次操作一致
        //	比如都是 put,那么就把本次 put 操作放到该栈头的前面即可,让本次 put 能够先执行
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 设置了超时时间,并且 e 进栈或者出栈要超时了,
            // 就会丢弃本次操作,返回 null 值。
            // 如果栈头此时被取消了,丢弃栈头,取下一个节点继续消费
            if (timed && nanos <= 0) {      // 无法等待
                // 栈头操作被取消
                if (h != null && h.isCancelled())
                    // 丢弃栈头,把栈头的后一个元素作为栈头
                    casHead(h, h.next);     // 将取消的节点弹栈
                // 栈头为空,直接返回 null
                else
                    return null;
            // 没有超时,直接把 e 作为新的栈头
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // e 等待出栈,一种是空队列 take,一种是 put
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                // 本来 s 是栈头的,现在 s 不是栈头了,s 后面又来了一个数,把新的数据作为栈头
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        // 栈头正在等待其他线程 put 或 take
        // 比如栈头正在阻塞,并且是 put 类型,而此次操作正好是 take 类型,走此处
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 栈头已经被取消,把下一个元素作为栈头
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            // snode 方法第三个参数 h 代表栈头,赋值给 s 的 next 属性
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    // m 就是栈头,通过上面 snode 方法刚刚赋值
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                     // tryMatch 非常重要的方法,两个作用:
                     // 1 唤醒被阻塞的栈头 m,2 把当前节点 s 赋值给 m 的 match 属性
                     // 这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s
                     // 其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

执行流程:

  1. 判断是 put 方法还是 take 方法;
  2. 判断栈头数据是否为空,如果为空或者栈头的操作和本次操作一致,是的话走3,否则走5;
  3. 判断操作有无设置超时时间,如果设置了超时时间并且已经超时,返回 null,否则走4;
  4. 如果栈头为空,把当前操作设置成栈头,或者栈头不为空,但栈头的操作和本次操作相同,也把当前操作设置成栈头,并看看其它线程能否满足自己,不能满足则阻塞自己。比如当前操作是 take,但队列中没有数据,则阻塞自己;
  5. 如果栈头已经是阻塞的,需要别人唤醒的,判断当前操作能和唤醒栈头,可以唤醒走6,否则走4;
  6. 把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点;
  7. 栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回。

awaitFulfill

节点阻塞的方法

/**
 * 旋转/阻止,直到节点s通过执行操作匹配。
 * @param s 等待的节点
 * @param timed true if timed wait
 * @param nanos 超时时间
 * @return 匹配的节点, 或者是 s 如果被取消
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
	
    // deadline 死亡时间,如果设置了超时时间的话,死亡时间等于当前时间 + 超时时间,否则就是 0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 自旋的次数,如果设置了超时时间,会自旋 32 次,否则自旋 512 次。
    // 比如本次操作是 take 操作,自旋次数后,仍无其他线程 put 数据
    // 就会阻塞,有超时时间的,会阻塞固定的时间,否则一直阻塞下去
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 当前线程有无被打断,如果过了超时时间,当前线程就会被打断
        if (w.isInterrupted())
            s.tryCancel();

        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            // 超时了,取消当前线程的等待操作
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        // 自选次数减1
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        // 把当前线程设置成 waiter,主要是通过线程来完成阻塞和唤醒
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            // park 阻塞
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

当一个 节点/线程 将要阻塞时,它会设置其 waiter 自动,然后在真正 park 之前至少再检查一次状态,从而涵盖了竞争与实现者的关系,并注意到 waiter 非空,因此应将其唤醒。

当由出现在调用点位于堆栈顶部的节点调用时,对停放的调用之前会进行旋转,以避免在生产者和消费者及时到达时阻塞。这可能只足以在多处理器上发生。

从主循环返回的检查顺序反映了这样一个事实,即优先级:中断 > 正常的返回 > 超时。(因此,在超时时,在放弃之前要进行最后一次匹配检查)除了来自非定时 SynchronousQueue 的调用。{poll / offer} 不会检查中断,根本不等待,因此陷入了转移方法中,而不是调用 awaitFullfill 方法。

而且可以发现其阻塞策略,并不少一上来就阻塞住,而是在自旋一定次数后,仍然没有其它线程来满足自己的要求时,才会真正的阻塞。

图解非公平模型

  • 线程 put1 执行 put(1) 操作,由于当前无配对的消费线程,所以 put1 线程入栈,自旋一小会后睡眠等待

阻塞队列 — SynchronousQueue源码分析

  • 接着,线程 put2 再次执行 put(2) 操作,put2 线程入栈,自旋一小会后睡眠等待

阻塞队列 — SynchronousQueue源码分析

  • 这时候,来了一个线程 take1,执行 take 操作,这时候发现栈顶为 put2 线程,匹配成功,但是实现会先把 take1 线程入栈,然后 take1 线程循环执行匹配 put2 线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1 线程

阻塞队列 — SynchronousQueue源码分析

  • 最后,再来一个线程 take2,执行 take 操作,这跟上一步的逻辑基本一致,take2 线程入栈,然后再循环中匹配 put1 线程,最终全部匹配完毕,栈空

阻塞队列 — SynchronousQueue源码分析
从上面流程看出,虽然 put1 线程先入栈了,但是确实后匹配,这就是非公平策略。

公平的队列

队列元素

/** 队列中的节点 */
static final class QNode {
	// 下一个节点
	volatile QNode next;          // next node in queue
	// 存储的元素
	volatile Object item;         // CAS'ed to or from null
	// 等待着的线程
	volatile Thread waiter;       // to control park/unpark
	// 是否是数据节点
	final boolean isData;
 }
 
// 队列的头节点
transient volatile QNode head;
// 队列的尾节点
transient volatile QNode tail;
  • volatile QNode next 当前元素的下一个元素
  • volatile Object item // CAS’ed to or from null 当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程会把自己 set 到 item 里面
  • volatile Thread waiter // to control park/unpark 阻塞线程
  • final boolean isData true 是 put,false 是 take

transfer

TransferQueue 内部类的 transfer 方法

E transfer(E e, boolean timed, long nanos) {
    /**
     *
     * 这个基本方法, 主要分为两种情况
     *
     * 1. 若队列为空 / 队列中的尾节点和自己的 类型相同, 则添加 node
     *      到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配
     *      timeout/interrupt awaitFulfill方法返回的是 node 本身
     *      匹配成功的话, 要么返回 null (producer返回的), 或正真的传递值 (consumer 返回的)
     *
     * 2. 队列不为空, 且队列的 head.next 节点是当前节点匹配的节点,
     *      进行数据的传递匹配, 并且通过 advanceHead 方法帮助 先前 block 的节点 dequeue
     */

    QNode s = null; // 根据需要构造/重用
    // true:put  false:get
    boolean isData = (e != null);

    for (;;) {
        // 队列首尾的临时变量,队列空时,t=h
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null) // 看到未初始化的值
            continue;               // 自旋
        // 首尾节点相同,队列空
        // 或队尾节点的操作和当前节点操作相同
        if (h == t || t.isData == isData) {
            QNode tn = t.next;
            // tail 被修改,重试
            if (t != tail)
                continue;
            // 队尾后面的值还不为空,说明其他线程添加了 tail.next,t 还不是队尾,直接把 tn 赋值给 t
            if (tn != null) {
                advanceTail(t, tn);
                // 自旋
                continue;
            }
            // 超时直接返回 null
            if (timed && nanos <= 0)        // 等不及了
                return null;
            // 创建节点
            if (s == null)
                s = new QNode(e, isData);
            // 如果把 s 放到队尾失败,继续递归放进去
            if (!t.casNext(null, s))        // 链接失败
                continue;

            advanceTail(t, s);              // 推进 tail 节点并等待
            // 阻塞住自己,直到有其他线程与之匹配, 或它自己进行线程的中断
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
    			/**
                 * 对接点 s 进行清除,若 s 不是链表的最后一个节点,则直接 CAS 进行 节点的删除;
                 * 若 s 是链表的最后一个节点,则 要么清除以前的 cleanMe 节点(cleamMe != null),
                 * 然后将 s.prev 设置为 cleanMe 节点,下次进行删除 或直接将 s.prev 设置为cleanMe
                 */            
                clean(t, s); 
                return null;
            }

            if (!s.isOffList()) {           // 尚未取消链接
                advanceHead(t, s);          // unlink if head 推进head 节点, 下次就调用 s.next 节点进行匹配(这里调用的是 advanceHead, 因为代码能执行到这边说明s已经是 head.next 节点了)
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;
        // 队列不为空,并且当前操作和队尾不一致
        // 也就是说当前操作是队尾是对应的操作
        // 比如说队尾是因为 take 被阻塞的,那么当前操作必然是 put
        } else {                            // complementary-mode
            // 如果是第一次执行,此处的 m 代表就是 tail
            // 也就是这行代码体现出队列的公平,每次操作时,从头开始按照顺序进行操作
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                // m 代表栈头
                // 这里把当前的操作值赋值给阻塞住的 m 的 item 属性
                // 这样 m 被释放时,就可得到此次操作的值
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }
            // 当前操作放到队头
            advanceHead(h, m);              // successfully fulfilled
            // 释放队头阻塞节点
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

线程被阻塞住后,当前线程是如何把直接的数据传给阻塞线程的?

假设线程1 从队列中 take 数据,被阻塞,变成阻塞线程 A ,然后线程 2 开始往队列中 put 数据 B,大致的流程如下:

  • 线程 1 从队列 take 数据,发现队列内无数据,于是被阻塞,成为 A ;
  • 线程 2 往队尾 put 数据,会从队尾往前找到第一个被阻塞的节点,假设此时能找到的就是节点 A,然后将线程 B 将 put 的数据放到节点 A 的 item 属性里面,并唤醒线程 1 ;
  • 线程 1 被唤醒后,就能从 A.item 里面拿到线程2 put 的数据了,线程1 成功返回。

在这个过程中,公平主要体现在,每次 put 数据的时候,都 put 到队尾上,而每次拿数据时,并不是直接从队头拿数据,而是从队尾往前寻找第一个被阻塞的线程,这样就会按照顺序释放被阻塞的线程。

avanceTail

  • 尝试 cas 将 nt 作为新的tail
void advanceTail(QNode t, QNode nt) {
	if (tail == t)
		UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}

图解公平队列模型

公平模式下,底层实现使用的是 TransferQueue 队列,它有一个 head 和 tail 指针,用于指向当前正在等待匹配的线程节点。

  • 初始化时的 TransferQueue

阻塞队列 — SynchronousQueue源码分析

  • 线程 put1 执行 put(1),由于当前没有配对的消费线程,所以 put1 线程入队,自旋一小会后睡眠等待

阻塞队列 — SynchronousQueue源码分析

  • 接着,线程 put2 执行 put(2) ,put2 线程入队,自旋一小会后睡眠等待

阻塞队列 — SynchronousQueue源码分析

  • 这时来了一个线程 take1,执行了 take,由于 tail 指向 put2 线程,put2 线程跟 take1 线程匹配,这时 take1 线程不需要入队

注意:这时需要唤醒的线程并不是 put2,而是 put1.
因为现在是公平策略,谁先入队,谁优先被唤醒,这里显然 put1 应该优先被唤醒.
公平策略总结就一句话:队尾匹配队头出队

  • 指向后 put1 线程被唤醒,take1 线程的 take() 方法返回了 1 (put1线程的数据),这样就实现了线程间的一对一通信

阻塞队列 — SynchronousQueue源码分析

  • 最后,再来一个线程 take2,执行 take 操作,这时候只有 put2 线程再等候,而且两个线程匹配上了,线程 put2 被唤醒,take2 线程 take 操作返回了2(线程put2的数据),这时候队列又返回到起点。

阻塞队列 — SynchronousQueue源码分析

总结

SynchronousQueue 是*的,是一种无缓冲的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加;可以认为SynchronousQueue是一个缓存值为1的阻塞队列,但是 isEmpty()方法永远返回是true,remainingCapacity() 方法永远返回是0,remove()和removeAll() 方法永远返回是false,iterator()方法永远返回空,peek()方法永远返回null。

SynchronousQueue 其有独特的线程配对通信机制,在平常开发中不太会用到,但在线程池技术 Executors.newCachedThreadPool() 就使用了 SynchronousQueue,内部没有使用AQS,而是直接使用CAS。

SynchronousQueue 内没有容器为什么还能够存储一个元素呢?
因为内部没有容器指的是没有像数组那样的内存空间存多个元素,但是是有单地址内存空间,用于交换数据.

PS:以上代码提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持续更新,可以公众号搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

本文地址:https://blog.csdn.net/org_hjh/article/details/109955522