欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

《Java并发编程的艺术》第六章 Java并发容器和框架

程序员文章站 2022-05-05 09:38:37
...


第六章 Java并发容器和框架

框架图

高清图片地址
《Java并发编程的艺术》第六章 Java并发容器和框架


ConcurrentHashMap的实现原理与使用

为什么要使用ConcurrentHashMap

其他HashMap方法的问题

  • HashMap的并发编程会导致死循环。
  • 线程安全的HashTable效率低下。

HashMap分析

  • 多线程情况下,并发执行put会导致HashMap的Entry链表形成环形数据结构,一旦形成了环形,Entry的next节点永不为空,就会死循环。(Entry是啥?等看完了HashMap源码再研究研究)。
  • 案例:《Java并发编程的艺术》第六章 Java并发容器和框架

HashTable分析
HashTable使用sync来保证线程安全,然而当一个线程访问HashTable同步方法时(Put),其他的线程就会进入阻塞状态(既不能Put也不能Get),所以效率很低。(对所有共享内存锁住了,sync是总线锁吗?忘记了)

ConcurrentHashMap可行原因
ConcurrentHashMap使用了分段锁技术,即将数据分成一段一段地存储,然后给每段分配一把锁,当一个线程占用锁访问某一段时,其他段的数据也能被其他线程访问。


ConcurrentHashMap的结构

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成:

  • Segment:一种可重入锁,扮演锁的角色,是一种数组和链表的结构,包含一个HashEntry数组,Segment守护这个数组。
  • HashEntry:一个链表结构,存储键值对数据,当对HashEntry数组的数据进行修改时,必须先获得与之对应的Segment锁。

ConcurrentHashMap类图
《Java并发编程的艺术》第六章 Java并发容器和框架

ConcurrentHashMap结构图
《Java并发编程的艺术》第六章 Java并发容器和框架


ConcurrentHashMap的初始化

1. 初始化segments数组
concurrencyLevel是segment容器的序号吗?

  • 源码
    if (concurrencyLevel > MAX_SEGMENTS){
        concurrencyLevel = MAX_SEGMENTS;
    }
    int sshift = 0;
    int ssize = 1;
    // ssize只要2的倍数
    while (sshift < concurrencyLevel){
        ++sshift;
        ssize <<= 1;
    }
    segmentShift = 32 - sshift;
    segmentMask = ssize - 1;
    this.segments = Segment.newArray(ssize);
    
  • 说明
    • 为了通过按位与的散列算法来计算segments数组的索引,必须保证segments数组的长度是2的N次方。(所以是while循环中左移一位,就是乘2)。
    • ssize也就是容器里的锁的数量。
    • concurrentcyLevel的最大值是65535,所以数组的最大程度是65536。
    • 默认情况下concurrencyLevel等于16。

2. 初始化segmentShift和segmentMash

  • segmentShift:sshift等于ssize从1向左移动的次数,segmentShfit用于计算参与散列计算的数值的位数,ConcurrentHashMap的hash()方法输出的最大数是32位,所以segmentShift = 32 - sshift,(移动的地方都可以置0,不参与计算?)。
  • segmentMask:散列运算的掩码,等于ssize-1,掩码的各个位置都是1。

3. 初始化每个segment

  • 源码
    if (initialCapacity > MAXIMUM_CAPACITY){
        initialCapacity = MAXIMUM_CAPACITY;
    }
    // 计算segment里的每个HashEntry长度
    int c = initialCapacity / ssize;
    // 能多不能少
    if (c * ssize < initialCapacity){
        ++c;
    }
    // 数组长度应为2的倍数
    int cap = 1;
    while (cap < c){
        cap <<= 1;
    }
    // 给每个segment分配,loadFactor是分配因子
    for (int i=0; i < this.segments.length; ++i){
        this.segments[i] = new Segment<K, V>(cap, loadFactor);
    }
    
  • 说明:segment的容量threshold = (int) cap * loadFactor,默认情况下initialCapacity等于16,loadfactory等于0.75。

定位Segment

虽然segment里面分成了好多数组,但是每个put的元素插入哪个数组是hash决定的,而不是按照顺序来?答:对
还是说第一步找segment和第二步在segment里找数组都是用Hash做的?答:对

目的:要分段,就得在插入和获取元素的时候,先通过散列算法定位到Segment。

再散列:会显示用一个方法对hash值进行再散列,目的是减少散列冲突,是元素能够均匀地分布在不同的Segment上,从而提高容器的存取效率。

定位算法
默认情况下segmentShift为28,segmentMask为15,下面hash最大32位,向右移动28位只比较高四位(只有四位有效,那为什么是高4位而不是低4位?还是说都一样?应该不一样,往右移动的话,只有右边四个有数字,此时用 & segmentMask,可以得到一个数值),segmentMask是四个1,也就是15,容器一共16个,也就是对15求余,可得出一个值(0-15)。

final Segment<K, V> segmentFor(int hash){
    return segments[(hash >>> segmentShift) & segmentMask];
}

ConcurrentHashMap的操作

1. get操作

  • 源码
    public V get(Object key){
        int hash = hash(key.hashCode());
        return segmentFor(hash).get(key, hash);
    }
    
    // 对比
    (hash >>> segmentShift) & segmentMask    // 定位segment所使用的hash算法
    int index = hash & (tab.length - 1);     // 定位HashEntry所使用的hash算法
    
  • 说明
    • 先经过一次再散列,然后通过这个散列定位到segment,然后再通过segment的散列算法定位到元素。
    • segment的定位有两个元素,一个hash,一个key,这个hash用来寻找segment中对应的HashEntry,然后用key在HashEntry中找到value。此处注意找segment和找HashEntry的hash方法不同,如上面的代码。作者说不取高位的目的是避免两次散列后的值相同(为啥会有这样的效果?)。
    • get的高效在于整个get过程不需要加锁,除非读到的值是空的才会加锁重读(不是安全的吗, 为什么这里要加锁?为什么会读到空值,没赋值?跟线程安全有什么关系?),因为get方法里要使用的共享变量都是volatile类型(segment大小字段和HashEntry里面的value),而volatile类型的读是可以多线程的,写的话只能单线程,如果两个线程同时修改和读volatile变量,也会因为JMM的happen before原则,先写再读。这是volatile替换锁的经典应用。

2. put操作
因为是写入操作,所以为了线程安全要加锁,方法流程是先定位到Segment,然后在Segment里面进行插入,插入分两步,第一步判断是否要进行HashEntry的扩容,第二步是定位添加元素位置,加入到HashEntry中。

  • 扩容判断:插入前检查Segment里的HashEntry数组是否超过容量(threshold),超过了就扩容。HashMap是插入后判断是否到头了,然后扩容,但是又可能扩了后又没有新元素插入。
  • 如何扩容:先创建一个容量是原来两倍的数组(即一个大小为原来二倍的HashEntry),然后将原数组里的元素进行再散列插入到新数组中。为了高效,不会对整个容器扩容,只会对某个segmetn扩容。(数组的容量就是数组长度,容器就是全部segment包含的内容吧,对某个segmetn扩容是扩里面全部的HashEntry,还是只扩一个?)。

3. size操作

  • 方法:计算整个的大小,就要统计所有Segment里面的count然后求和,如果将所有操作锁住的话,效率会很低,再加上累加count的时候count变化的概率很小,所以就有了这种做法:先尝试两次不锁Sement的方式统计大小,如果统计过程中容器的count发生了变化,则采用加锁的方式计算。
  • 判断容器变化:使用一个modCount变量,影响count的操作都会让该变量数值+1,所以统计size的时候比较前后的modCount就能知道容器大小是否发生变化。(妙啊,我还想着用CAS,但是CAS也没法保证经过之后count又变化了)。

ConcurrentLinkedQueue

循环CAS不需要锁吗?

ConcurrentLinkedQueue是一种用非阻塞方式实现的线程安全队列。

ConcurrentLinkedQueue类图
默认情况下head节点存储的元素为空,tail节点等于head节点。
《Java并发编程的艺术》第六章 Java并发容器和框架


入队列

1. 入队列的过程

  • 介绍:就是将入队节点添加到队列的尾部。
  • 过程图《Java并发编程的艺术》第六章 Java并发容器和框架
  • 说明:入队跟普通的还挺不一样的,最重要的一点是:tail节点并不总是尾节点。入队分两步,第一步就是将入队的节点设置成尾节点的下一个,常规操作。第二步就得判断了,如果插入前tail节点后面还有个节点,那么插入后tail就指向新插入的节点(如插入元素2);如果插入前tail节点就是最后一个节点,那么插入后tail不移动,并不指向最后一个节点(如插入元素3)。
  • 源码:跟书上的不一样,实现变了,没看懂
    public boolean offer(E e) {
        checkNotNull(e);
        // 新建入队节点
        final Node<E> newNode = new Node<E>(e);
        // 死循环,p初始化指向tail,但p的意义是尾节点
        for (Node<E> t = tail, p = t;;) {
            // q是尾节点后面的点
            Node<E> q = p.next;
            if (q == null) {
                // q=null说明tail也是尾节点,此时应t=q
                // 用null来判断,如果又线程抢先插入,则p.next不为null,不执行
                // 那咋办?位置都不对了?
                if (p.casNext(null, newNode)) {
                    // p不等于t,说明在这之前其他线程做了插入工作
                    // 那么此时,tail后面已经不为空了,再插入一个点,tail就要指向最后一个了
                    if (p != t) // hop two nodes at a time
                        // 失败也没有关系,失败表示其他线程更新了tail
                        // (就是第二次操作也没抢到)
                        casTail(t, newNode);  // Failure is OK.
                    // 这里虽然tail不是自己更新的,但只要更新了就行,且节点已经插入了,
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // 只有一种情况,就是刚初始化,都为空
                p = (t != (t = tail)) ? t : head;
            else
                // q不为空的情况,tail的next的next不为空,按理来说tail应该是最后一个了
                // 这里说明在这之前又被插入了,后面多了(通过循环,来保证能让p和t是最后)
                // 如果tail不是最后一个,p是最后一个(合理),那么更新tail为最后一个,且p也是最后一个
                // 如果tail和p都是最后一个,p往后移一个
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
  • 书上源码《Java并发编程的艺术》第六章 Java并发容器和框架
  • 说明
    • 做了两件事,第一件事是定位出尾节点;第二是使用CAS算法将入队节点设置成尾节点的next节点,不成功就重试。
    • offer前被抢先offer怎么办?答:最后一个else解决。
    • 第二个else没看懂。答:下面说了

2. 定位尾节点

  • 找下一个节点代码《Java并发编程的艺术》第六章 Java并发容器和框架

3. 设置入队节点为尾节点
如果不是null就说明其他线程更新了尾节点,需要重新获取当前队列的尾节点。

4. HOPS的设计意图
就是为什么不把tail直接设置成尾节点?目的是为了提高效率,如果tail是尾节点,那么每次都要使用循环CAS更新tail节点。现在的方法让tail节点和尾节点距离大于一个常量才更新tail节点,这样CAS更新的次数就像少了。就是用更多的循环和判断来代替CAS写操作,虽然循环和判断多了些volatile读的操作(读tail),但是volatile读的开销远小于volatile写,所以总的入队效率还是提升了。

注意:入队方法返回的永远都是true。


出队列

功能:从队列开头返回一个节点元素,并清空该节点对元素的引用。

特点:head的更新跟tail一样,也不是立马更新的。当出的时候,如果head指向里面有节点,那就移除这个节点,head不动;如果head指向为空,那么取出这个位置后面的节点,并让head指向next的next。目的也是为了减少CAS消耗。(头节点为空了,next怎么找?)

快照图
《Java并发编程的艺术》第六章 Java并发容器和框架

源码

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

书上源码
《Java并发编程的艺术》第六章 Java并发容器和框架


Java中的阻塞队列

附加支持阻塞插入和移除方法的队列

  • 支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满
  • 支持阻塞移除的方法:当队列为空,获取元素的线程会等待队列变为非空

常用场景:生产者与消费者场景。

阻塞队列不可用时的四种处理方式
《Java并发编程的艺术》第六章 Java并发容器和框架

《Java并发编程的艺术》第六章 Java并发容器和框架


Java里的阻塞队列

《Java并发编程的艺术》第六章 Java并发容器和框架

ArrayBlockingQueue

特点:用数组实现的、有界阻塞队列、按先进先出原则排序、默认不保证线程公平。

公平性实现
使用可重入锁实现的
《Java并发编程的艺术》第六章 Java并发容器和框架


LinkedBlockingQueue

特点:用链表实现、有界阻塞、默认和最大长度为Integer.MAX_VALUE、按先进先出原则对元素排序。


PriorityBlockingQueue

特点:支持优先级、*阻塞、自然顺序升序排序(就是放进来的顺序?)

指定排序规则

  • 自定义实现compareTo()方法来指定元素排序规则。
  • 初始化时,指定构造参数Comparator来对元素进行排序。
  • 注意:同优先级的元素排序不能保证

DelayQueue

特点:支持延时获取元素、*阻塞、元素必须实现Delayed接口,指定过多久才能从队列中获取当前元素,只有到时间了才能获取到。

应用场景

  • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使线程循环查询,没到时间查不到,查到了说明缓存有效期到了。(那查到的不也是老的,不够新啊)
  • 定时任务调度:使用DelayQueue保存当天会执行的任务和执行的时间。

如何实现Delayed接口

  • 第一步:初始化数组,使用time记录延迟到什么时候可以使用,使用sequenceNumber来标识元素在队列中的顺序。
    《Java并发编程的艺术》第六章 Java并发容器和框架
  • 第二步:实现getDelay方法,返回当前元素还需要延时多长时间,单位是纳秒。
    《Java并发编程的艺术》第六章 Java并发容器和框架
  • 第三步:实现compareTo来指定元素的顺序,下面的例子就是把延时时间最长的放在队列的末尾。
    《Java并发编程的艺术》第六章 Java并发容器和框架

如何实现延时阻塞队列

  • 思路:当消费者从队列里获取元素时,如果没有达到延时时间,就阻塞线程
  • 代码
    《Java并发编程的艺术》第六章 Java并发容器和框架
  • 说明:先检查延迟,如果时间过了(返回为负),就取出队列头部元素;leader是一个等待获取队列头部元素的线程,如果不为空说明已经有线程在等待头部了,所以当前线程就要等待;如果为空的话,那么自己就能争取,把当前线程设为leader,然后使用延迟等待来设置唤醒时间。

SynchronousQueue

特点:不存储元素、每一个put必须等待一个take(生产之前,要有消费者才行,没有就不生产,等着),否则不能添加元素、默认非公平

进出原则设置
《Java并发编程的艺术》第六章 Java并发容器和框架

适用场景

  • 可以看成一个传球手,负责把生产者线程处理的数据直接传递给消费者线程,适用于传递性场景。
  • SynchronousQueue的吞吐量高于LinkedBlockedQueue和ArrayBlockedQueue。

LinkedTransferQueue

特点:由链表结构组成、*阻塞、多了tryTransfertransfer方法。

transfer方法

  • 如果有消费者在等待接收元素,那么就立即把生产者传入的元素传输给消费者。
  • 如果没有消费者在等待,该方法会把元素存放在队列的tail节点,并且在该元素被消费了才返回(不用就一直在这个方法里等着)。
  • 代码:
    《Java并发编程的艺术》第六章 Java并发容器和框架
  • 代码说明:第一行是设置tail节点,第二行是自旋等待消费,会在一定次数后使用Thread.yield()方法暂停当前正在执行的线程。

tryTransfer方法

  • 试探生产者传入的元素能否直接传给消费者,有消费者接收就返回true,没有就返回false。

tryTransfer(E e, long timeout, TimeUnit unit)方法

  • 试图把生产者传入的元素直接传给消费者,如果指定时间内没消费就返回false,消费了就返回true。

LinkedBlockingDeque

特点:由链表结构组成、双向阻塞、多了几个处理First/Last的方法、两个入口,线程竞争减少一半。

方法:add等同于addLast,remove等同于removeFirst。但take等同于takeFirst。

初始化:可以设置容量防止其过度膨胀。


阻塞队列的实现原理

问题描述:如果队列为空,则消费者等待,如果后来队列插入了元素,那么消费者要怎么知道呢?(两种方法呗,要么一直查看,要么通知)。

使用通知模式实现:当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个元素后,就会通知生产者当前队列可用。

源码

private final Condition notFull;
private final Condition notEmpty;

// 构造函数,通过重入锁实现
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 队列不可用,就要等待
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 在这里唤醒消费者的等待队列
    notEmpty.signal();
}

await()源码
主要通过park来实现等待,注意先把节点放到Condition的同步队列里了,然后释放了同步状态,然后检查在不在同步队列,不在的话就暂停了。
《Java并发编程的艺术》第六章 Java并发容器和框架

park源码
setBlocker的作用是保存一下要阻塞的线程,然后调用unsafe.park阻塞当前线程

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

unsafe.park方法

public native void park(boolean isAbsolute, long time);

是个native方法,只有下面四种情况下,方法才会返回:

  • 与park对应的unpark执行或已经执行。“已经执行”指的是unpark先执行,然后再执行park的情况。
  • 线程被中断时。
  • 等待完time参数指定的毫秒数时
  • 异常现象发生时,这个异常现象没有任何原因。

unsafe.park代码
《Java并发编程的艺术》第六章 Java并发容器和框架

流程梳理:以插入为例,阻塞队列自己本身有put方法,如果队列满了,那么会调用Condition类型的notFullawait()方法,这个方法主要做了三件事:将线程添加到条件队列、释放同步状态、使用park方法暂停当前线程,然后park方法将该线程保存起来后,调用nativeunsafe.park方法。

线程状态:当线程被队列阻塞时,线程会进入WAITING状态。

WAITING和BLOCKED

  • 线程调用了wait、join、park函数时,会进入等待状态。
  • 等待是啥也不干等通知,通知到了可以去做事情,比如竞争锁,失败就进入阻塞状态。
  • 阻塞是不需要唤醒的,jvm来操作,有机会就上。
  • 可能是等待通知起来比较方便,我知道A线程一定会阻塞,那么我不如让他进入等待,等我知道啥时候有机会了在通知A。

《Java并发编程的艺术》第六章 Java并发容器和框架

相关标签: Java开发原理