欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

理解java.util.concurrent包(二)

程序员文章站 2022-05-04 21:39:54
...

下面我们来看java.util.concurrent.locks包:
理解java.util.concurrent包(二)

1.AbstractOwnableSynchronizer、AbstractQueuedLongSynchronizer、AbstractQueuedSynchronizer

AbstractQueuedLongSynchronizer和AbstractQueuedSynchronizer是AbstractOwnableSynchronizer的子类。AbstractOwnableSynchronizer主要包括了一个Thread类型的变量exclusiveOwnerThread,并且对这个变量有get/set方法,来设置和获取独占的线程。
AbstractQueuedLongSynchronizer是从JDK6才开始出现的,还没有具体实现的子类
AbstractQueuedSynchronizer的子类实现在可重入锁里面
AbstractQueuedSynchronizer主要有四个最核心的方法:

protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)

分别对应获取和释放锁,前两个用于独占锁,后两个用于共享锁,这四个方法由子类来实现,默认实现是抛出UnsupportedOperationException。
AbstractQueuedSynchronizer仅仅提供获取和释放锁的流程,即当前线程尝试获取锁的时候,AbstractQueuedSynchronizer会先调用tryAcquire或者tryAcquireShared,如果返回false,就把当前线程放到等待队列中去,然后再做进一步操作。
AbstractQueuedSynchronizer按获取过程中的处理方式可以分为三种:响应中断,不响应中断和尝试限时等待超时放弃。这三种方式均针对共享锁和独占锁。
1.独占锁不响应中断

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我们看到线程首先会尝试获取锁,如果获取成功则进行操作,若是没有获取到,则将这个线程加入到等待的队列中,我们来看acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

首先获取前一个节点,若是头结点,那么久再次尝试获取,如果获取成功则返回,如果检测到线程中断,返回true,否则返回false.
如果等待期间检测到中断信号,也就是acquireQueued返回了true,会用selfInterrupt中断当前线程。我们看shouldParkAfterFailedAcquire方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //前面节点在释放锁以后会告诉当前线程,等待即可
            return true;
        if (ws > 0) {
           //大于0的状态只有cancel,所以需要跳过,并且从队列中清除出去
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }

2.响应中断的独占锁:

 public final void acquireInterruptibly(long arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

它和不响应中断的区别在于调用doAcquireInterruptibly,doAcquireInterruptibly在发现中断后,直接break,然后取消获取锁的打算。如下:

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

3.限时等待超时放弃

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L) {
                    cancelAcquire(node);
                    return false;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

通过循环的方式,若前一个节点是头结点则再次请求一次,成功则返回,否则,累积时间差,若是超时,则取消并返回false。如果线程被中断则抛出异常。
4.不响应中断的共享锁

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

我们看doAcquireShared方法:

 private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }   

与独占锁的区别是doAcquireShared把selfInterrupt()挪到自己的方法内,其次就是共享锁调用setHeadAndPropagate方法:

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

独占锁某个节点被唤醒之后,它只需要将这个节点设置成head就完事了,而共享锁不一样,某个节点被设置为head之后,如果它的后继节点是SHARED状态的,那么将继续通过doReleaseShared方法尝试往后唤醒节点,实现了共享状态的向后传播。我们看doReleaseShared()方法:

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

当头结点本身的waitStatus是SIGNAL且能通过CAS算法将头结点的waitStatus从SIGNAL设置为0时,唤醒头结点的后继节点;当头结点本身的waitStatus是0的话,尝试将其设置为PROPAGATE状态的,意味着共享状态可以向后传播。

共享锁的响应中断和超时放弃与上面同理。

在AQS的Node中有每个Node有自己的状态(waitStatus),我们这里归纳一下,分别包含:
SIGNAL——前面有线程在运行,需要前面线程结束后,调用unpark()方法才能**自己,值为:-1
CANCELLED——当AQS发起取消或fullyRelease()时,会是这个状态。值为1,也是几个状态中唯一一个大于0的状态,所以前面判定状态大于0就基本等价于是CANCELLED。
CONDITION——线程基于Condition对象发生了等待,进入了相应的队列,自然也需要Condition对象来**,值为-2。
PROPAGATE——传播。由于共享访问的特点,连续的读操作节点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。
状态0——.初始化状态,也代表正在尝试去获取临界资源的线程所对应的Node的状态。

2.ReentrantLock

ReentrantLock实现了Lock方法,在ReentrantLock中包含了三个类,Sync,NonfairSync,FairSync,其中后面两个类继承第一个类。
ReentrantLock提供了两个构造器,分别是:

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

默认构造器初始化为NonfairSync对象,即非公平锁,而带参数的构造器可以指定使用公平锁和非公平锁。
Sync继承了AQS,sync中的lock方法是abstract的,需要子类实现。

NonfairSync

实现的lock方法:

final void lock() {
      if (compareAndSetState(0, 1))
           setExclusiveOwnerThread(Thread.currentThread());
      else
           acquire(1);
}

首先是一个CAS操作,判断state是否是0(表示当前锁未被占用),如果是0则把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功。当多个线程同时尝试占用同一个锁时,CAS操作只能保证一个线程操作成功,则排队。

“非公平”即体现不是按顺序获取,比如占用锁的线程刚释放锁,state置为0,而排队等待锁的线程还未唤醒时,新来的线程可能就直接抢占了该锁。

若获取锁失败,会进入acquire。当然,这个方法包含在AQS里面,可以翻看前面,之前我们说tryAcquire。

会给子类实现,AQS只是提供了一个流程,我们看非公平锁中实现的tryAcquire:

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
}

这个方法调用的是父类的nonfairTryAcquire:

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {//没有线程占用锁
                if (compareAndSetState(0, acquires)) {
                //占用锁成功,设置独占线程为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                    // 更新state值为新的重入次数
                setState(nextc);
                return true;
            }
            return false;
        }

从上面看出来非公平锁tryAcquire的流程是:检查state字段,若为0,表示锁未被占用,那么尝试占用,若不为0,检查当前占用锁是否是自己,若是则更新state字段,表示重入锁的次数。如果以上两点都没有成功,则获取锁失败,返回false。
#####FairSync
FairSync实现的lock方法如下:

 final void lock() {
       acquire(1);
  }

调用了acquire并且传入了参数1,acquire方法我们可以看到前文是在AQS中的,tryAcquire是在子类中实现的,我们看FairSync中的tryAcquire方法:

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {{//没有线程占用锁
                if (!hasQueuedPredecessors() && //是否已经是最前面的节点
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

所以我们看到和非公平锁的区别主要是会判断是否按顺序()从头结点后第一个节点开始拿)。

ReentrantReadWriteLock

ReentrantReadWriteLock实现了ReadWriteLock接口,该接口提供readLock()方法获取读锁,writeLock()获取写锁:

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

接下来我们看构造方法:

public ReentrantReadWriteLock() {
        this(false);
}

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

Sync是ReentrantReadWriteLock的内部类,继承自AQS,并且有两个子类,一个是公平锁,还有一个是非公平锁,接下来我们来看Sync中的一组常量:

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

ReentrantReadWriterLock使用一个32位的int类型来表示锁被占用的线程数,高16位表示读锁占有的线程数量,低16位表示写锁被同一个线程申请的次数。
1. SHARED_SHIFT,表示读锁占用的位数,常量16
2. SHARED_UNIT,增加一个读锁,按照上述设计,就相当于增加 SHARED_UNIT;
3. MAX_COUNT,表示申请读锁最大的线程数量,为65535
4. EXCLUSIVE_MASK :表示计算写锁的具体值,用 getState &
EXCLUSIVE_MASK算出写锁的线程数,大于1表示重入。

static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }//读锁线程的数量 
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }// 写锁的计数,也就是它的重入次数
读锁

读锁,锁定的是AQS的state变量的高16位,当state的高16位等于0,表示当前读锁未被占有;当state的高16位大于0,表示当前读锁可能被一个或多个线程占有,允许重入。
一般来说读锁和写锁互斥,但是如果战有写锁的线程再次获取读锁,那么写锁降级为读锁。
我们看读锁的lock方法:

public void lock() {
     sync.acquireShared(1);
 }

acquireShared调用Sync父类AQS的acquireShared方法,调用的tryAcquireShared(arg)方法在Sync方法中实现:

protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&// 已分配了写锁
                getExclusiveOwnerThread() != current) // 且当前线程不是持有写锁的线程
                return -1;
            int r = sharedCount(c);// 取读锁计数
            if (!readerShouldBlock() &&// 由子类根据其公平策略决定是否允许获取读锁
                r < MAX_COUNT &&// 读锁数量还没达到最大值(2^16 -1)
                compareAndSetState(c, c + SHARED_UNIT)) {//通过CAS操作将读锁占有量+1(AQS的state高16位同步加1)
                // 注意下面对firstReader的处理:firstReader是不会放到readHolds里的
     // 这样,在读锁只有一个的情况下,就避免了查找readHolds。
        if (r == 0) { // 是 firstReader,计数不会放入  readHolds。
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { // firstReader 重入
            firstReaderHoldCount++;
        } else {
             // 非 firstReader 读锁重入计数更新
            HoldCounter rh = cachedHoldCounter; // 首先访问缓存
            if (rh == null || rh.tid != current.getId())
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 获取读锁失败,放到循环里重试。
    return fullTryAcquireShared(current);
}

齐总readerShouldBlock在公平模式和 非公平模式下有所区别,公平模式下:

final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();//等待队列head节点后的节点非共享节点,返回true
    }

final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }

公平模式下:

final boolean readerShouldBlock() {
        return hasQueuedPredecessors();//等待队列的头尾不为空,并且存在head后的节点并且节点的线程非当前线程,返回true。
    }

public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

接下来我们看unlock方法:

public void unlock() {
    sync.releaseShared(1);
}

releaseShared方法是ASQ中的方法,其中调用了tryReleaseShared,这个方法在子类实现,我们看到Sync中实现了这个方法:

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 清理firstReader缓存 或 readHolds里的重入计数
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != current.getId())
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            // 完全释放读锁
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count; // 主要用于重入退出
    }
    // 循环在CAS更新状态值,主要是把读锁数量减 1
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 释放读锁对其他读线程没有任何影响,
            // 但可以允许等待的写线程继续,如果读锁、写锁都空闲。
            return nextc == 0;
    }
}

读锁的释放就是AQS的state高16位同步递减为0的过程,当高16位都为0则读锁释放完毕,如果此时写锁状态为0,唤醒head节点后下一个SIGNAL状态的节点的线程,一般为等待写锁的节点。如果读锁的占有数不为0,表示读锁未完全释放。或者写锁的占有数不为0,表示释放的读锁是写锁降级来的。
接下来我们分析写锁:
请求锁的过程如下:

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) { // 状态不为0,表示锁被分配出去了。

        // (Note: if c != 0 and w == 0 then shared count != 0)
      // c != 0 and w == 0 表示分配了读锁
      // w != 0 && current != getExclusiveOwnerThread() 表示其他线程获取了写锁。
        if (w == 0 || current != getExclusiveOwnerThread())
            return false ;

        // 写锁重入
        // 检测是否超过最大重入次数。
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");

        // 更新写锁重入次数,写锁在低位,直接加上 acquire 即可。
        // Reentrant acquire
        setState(c + acquires);
        return true ;
    }

    // writerShouldBlock 留给子类实现,用于实现公平性策略。
    // 如果允许获取写锁,则用 CAS 更新状态。
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false ; // 不允许获取锁 或 CAS 失败。

    // 获取写锁超过,设置独占线程。
    setExclusiveOwnerThread(current);
    return true;
}

释放锁的过程如下:

 protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();//释放的当前线程不是持有锁的线程,抛出异常
            int nextc = getState() - releases;//当前重入锁的数量
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }
相关标签: concurrent