欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java-并发-锁-ReadWriteLock

程序员文章站 2022-07-12 11:30:59
...

Java-并发-锁-ReadWriteLock

摘要

现在大家开发程序,大多是在多线程场景,就会用到各种锁。

但其实往往读和读之间是不冲突的,是无状态无修改的,不应该互相互斥。我们往往只需在读写或者写与写之间互斥即可。在JDK中就直接提供了一个ReadWriteLock,本文会介绍其基本概念及分析源码。

0x01 基本概念

ReadWriteLock的互斥关系如下表:

不互斥 互斥
互斥 互斥

ReadWriteLock的重要知识点如下:

  • 读写锁分为读锁和写锁,分开使用
  • 读锁允许读可并发,但此时写锁申请会被阻塞
  • 写锁不允许任何并发,一旦有线程拥有写锁,其他线程的读写锁申请全被阻塞
  • ReadWriteLock适用于写少读多的场景
  • ReadWriteLock也被称为共享-独占锁
  • 读写锁都可重入,调用了几次lock就必须配套调用几次unlcok

使用时一般是这几个api:

// 获得读写锁实例
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 读锁加锁,可与其他线程读锁共享,阻塞其他写锁申请,可重入
lock.readLock().lock()
// 读锁解锁,调用次数必须和lock相同
lock.readLock().unlock();

// 写锁加锁,阻塞其他线程读写锁申请,可重入
lock.writeLock().lock();
// 写锁解锁,调用次数必须和lock相同
lock.writeLock().unlock();  

0x02 实现原理

2.1 流程分析

2.2 源码分析

2.2.1 ReadWriteLock接口

ReadWriteLock类是一个位于java.util.concurrent.locks的接口类。

public interface ReadWriteLock {
    // 获取读锁 
    Lock readLock();

    // 获取写锁 
    Lock writeLock();
}

这个接口申明真简单,我们下面看看最常用的一个实现java.util.concurrent.locks.ReentrantReadWriteLock

2.2.2 ReentrantReadWriteLock定义和重要属性

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    // 读锁
    private final ReentrantReadWriteLock.ReadLock readerLock;
    // 写锁
    private final ReentrantReadWriteLock.WriteLock writerLock;
    // 用来支持同步机制,继承自AQS
    final Sync sync;

    // 默认非公平锁
    public ReentrantReadWriteLock() {
        this(false);
    }
    
    public ReentrantReadWriteLock(boolean fair) {
        // 有公平/非公平两种sync实现
        sync = fair ? new FairSync() : new NonfairSync();
        // 分别创建读写锁对象
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    // 得到构建好了的写锁。
    // 逼死强迫症啊,为啥这里的顺序和构造方法里不同- - 
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

2.2.3 Sync

  • 类定义
abstract static class Sync extends AbstractQueuedSynchronizer

继承自 AbstractQueuedSynchronizerAQS

  • 重要属性和方法
static final int SHARED_SHIFT   = 16;
// 2^16,1后面跟16个0。共享锁每次递增单位,其实就相当于高16位加1
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 2^16-1, 16个1。
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 2^16-1, 16个1。
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 保留高16位,代表共享锁(读锁)的许可数量
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
// 保留低16位,代表独占锁(写锁)的许可数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

// 内部类,用来对每个线程持有的共享读锁计数
static final class HoldCounter {
    int count = 0;
    // 这里使用线程ID而不是线程对象引用,是为了避免GC遗漏
    final long tid = getThreadId(Thread.currentThread());
}
// 存最后一个成功申请共享读锁的线程拥有的锁许可数量
private transient HoldCounter cachedHoldCounter;

/**
 * ThreadLocal subclass. Easiest to explicitly define for sake
 * of deserialization mechanics.
 */
// 内部类,继承自ThreadLocal,泛型为HoldCounter
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
    // 每个线程独有一个HoldCounter
        return new HoldCounter();
    }
}
// 当前线程持有的可重入读锁许可数量
// 在构造函数和readObject中初始化
// 当某个线程的读锁许可数量降为0,就移除该值
private transient ThreadLocalHoldCounter readHolds;

// 存第一个reader线程
private transient Thread firstReader = null;
// 存第一个reader线程的锁许可数量
private transient int firstReaderHoldCount;

// 构造方法,被非公平/公平的子类共用
Sync() {
    // 创建一个ThreadLocal的HoldCounter
    // 为每个线程单独统计拥有的共享读锁许可数量
    readHolds = new ThreadLocalHoldCounter();
    setState(getState()); // ensures visibility of readHolds
}

2.2.4 NonfairSync

NonfairSync是默认采用的,即非公平锁。

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
    final boolean readerShouldBlock() {
        /* As a heuristic to avoid indefinite writer starvation,
         * block if the thread that momentarily appears to be head
         * of queue, if one exists, is a waiting writer.  This is
         * only a probabilistic effect since a new reader will not
         * block if there is a waiting writer behind other enabled
         * readers that have not yet drained from the queue.
         */
        return apparentlyFirstQueuedIsExclusive();
    }
}

2.2.5 FairSync

可以在初始化时指定为公平锁FairSync

static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

2.2.6 ReentrantReadWriteLock初始化

初始化使用的语句是new ReentrantReadWriteLock(),他主要干的工作如下:

sync = fair ? new FairSync() : new NonfairSync();
// 分别创建读写锁对象,并把当前lock的sync的引用传递给两个锁
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);

上面不管用公平与非公平锁模式,都会调用他们的父类构造函数:

// 创建一个ThreadLocal的HoldCounter
// 为每个线程单独统计拥有的共享读锁许可数量
readHolds = new ThreadLocalHoldCounter();
setState(getState());

2.2.7 lock.readLock().lock()

我们来看看读锁的加锁流程:

  • reentrantReadWriteLock.lock
public void lock() {
    // 直接调用AQS的公平模式申请锁,许可数为1
    sync.acquireShared(1);
}
  • AQS.acquireShared

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
    // 尝试申请共享锁失败时,就以不可中断的模式申请共享锁
        doAcquireShared(arg);
}
  • reentrantReadWriteLock.tryAcquireShared
// 此方法返回值如下:
// 1.负数代表申请共享锁失败
// 2.数字0代表共享锁申请成功,但是后续的共享锁申请都不能成功
// 3.正数代表共享锁申请成功,后续的共享锁申请可能也成功,此时必须检查后续wait线程可用性
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    // 当前锁许可总数
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
    // 存在独占锁(写锁)且不是当前线程拥有,读写应阻塞,返回失败
        return -1;
    // 返回共享锁数量(就是锁的高16位)  
    int r = sharedCount(c);
    // readerShouldBlock判断是否应该被阻塞
    // 1.非公平锁调用apparentlyFirstQueuedIsExclusive,
    // 即如果wait_queue的真实线程头结点是独占锁(写锁),就返回true
    // 否则返回false
    // 2.而公平锁调用hasQueuedPredecessors
    // 当有线程结点在当前线程之前,返回true(公平性)
    // 当前线程为wait_queue真实头结点或wait_queue为空,返回false
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
    // 非公平锁模式,wait_queue真实头结点不是独占锁
    // 公平锁模式,当前线程为wait_queue真实头结点或wait_queue为空
    // 且共享锁数量尚未超出最大值
    // 且CAS更新锁许可高位(累积共享读锁数量)+1成功
        if (r == 0) {
        // 共享锁许可数为0
            // 当前线程作为头一个reader线程
            firstReader = current;
            // 分给他一个锁许可
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
        // 当前线程就是头一个reader线程,重入情况,直接累加锁许可
            firstReaderHoldCount++;
        } else {
        // 当前有别的线程持有共享锁
            // 最后一个成功申请共享读锁的线程和锁信息
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
            // 未曾有读锁获取或最后读锁不是当前线程
                // 拿到线程独有的HoldCounter,更新给cachedHoldCounter
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
            // 当前线程就是最后一个成功申请共享读锁,但还没有读锁?
                // 此时就将读锁信息更新给该线程
                // 因为在其他方法中,count==0时会remove
                readHolds.set(rh);
            // 最后累加读锁    
            rh.count++;
        }
        // 当前线程申请读锁成功,返回1
        return 1;
    }
    // 其余情况调用fullTryAcquireShared
    return fullTryAcquireShared(current);
}

总的来说:

  1. 别的线程拥有写锁,申请读锁失败,返回-1
  2. 无写锁,当前线程申请读锁成功成功,就更新线程的HoldCounter,返回1
  3. 其余情况,返回调用fullTryAcquireShared的结果
  • reentrantReadWriteLock.fullTryAcquireShared
// 该方法是申请读锁的完整版,有很多代码和tryAcquireShared相同
// 但处理了tryAcquireShared没有处理的CAS失败和非头Reader的重入情况
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
        // 此时当前reader线程应该被阻塞
        // 1.非公平锁调用apparentlyFirstQueuedIsExclusive,
        // 即如果wait_queue的真实线程头结点是独占锁(写锁),就返回true
        // 2.而公平锁调用hasQueuedPredecessors
        // 当有线程结点在当前线程之前,返回true(公平性)
        
            if (firstReader == current) {
            // 重入读锁
                // assert firstReaderHoldCount > 0;
            } else {
            // 非重入读锁
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        // 更新为当前线程的HoldCounter
                        rh = readHolds.get();
                        if (rh.count == 0)
                        // 当前线程的HoldCounter,就remove掉该HoldCounter
                        // 目的是防止ThreadLocal未释放造成的弱引用对象内存泄露
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                // 注意此时该reader线程应该阻塞
                    // 所以当当前线程已有锁许可为0,就认为失败,返回-1
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
        // 锁许可达到最大值,抛出异常
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
        // CAS为当前高位(共享锁)累积锁许可数增加锁许可成功
            if (sharedCount(c) == 0) {
            // 之前的共享锁许可数为0
                // 当前线程作为头一个reader线程
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
            // 当前线程就是头一个reader线程,重入情况,直接累加锁许可
                firstReaderHoldCount++;
            } else {
            // 当前有别的线程持有共享锁
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                // 当前线程共享读锁许可数自增    
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            // 当前线程读锁成功申请,返回1
            return 1;
        }
    }
}
  • AQS.doAcquireShared
// 以不可中断的模式申请共享锁
private void doAcquireShared(int arg) {
    // 当前线程以SHARED共享模式加入wait_queue
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 前驱结点
            final Node p = node.predecessor();
            if (p == head) {
            // 此时当前结点就是真实头结点
                // 尝试申请共享读锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                // 申请成功
                    // 设置队列头,并检查后继者是否可能在共享锁模式下等待,
                    // 如果是传播,则设置传播> 0或PROPAGATE状态。
                    setHeadAndPropagate(node, r);
                    // 从wait_queue移除当前节点
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    // 成功,退出循环返回
                    return;
                }
            }
            // 更新前驱结点为SIGNAL,阻塞当前线程直到唤醒
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}