欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

ReentrantReadWriteLock实现原理

程序员文章站 2024-01-25 20:31:34
在java并发包java.util.concurrent中,除了重入锁ReentrantLock外,读写锁ReentrantReadWriteLock也很常用。在实际开发场景中,在使用共享资源时,可能读操作远远多于写操作。这种情况下,如果对这部分共享资源能够让多个线程读的时候不受阻塞,仅仅在写的时候 ......

  在java并发包java.util.concurrent中,除了重入锁reentrantlock外,读写锁reentrantreadwritelock也很常用。在实际开发场景中,在使用共享资源时,可能读操作远远多于写操作。这种情况下,如果对这部分共享资源能够让多个线程读的时候不受阻塞,仅仅在写的时候保证安全性,这样效率会得到显著提升。读写锁reentrantreadwritelock便适用于这种场景。

  再描述一下进入读锁和写锁的条件。

  进入读锁: 

      1.没有其他线程的写锁

      2.有写请求且请求线程就是持有锁的线程

  进入写锁:

      1.没有其他线程读锁

      2.没有其他线程写锁

  本篇从源码方面,简要分析reentrantreadwritelock的实现原理,以及展示一下它的使用效果。

源码

  这是reentrantreadwritelock维护的一对锁

/** inner class providing readlock */
    private final reentrantreadwritelock.readlock readerlock;
    /** inner class providing writelock */
    private final reentrantreadwritelock.writelock writerlock;

  

  reentrantreadwritelock的构造器中,同时实例化读写锁,同时与reentrantlock相同,也有公平锁和非公平锁之分

public reentrantreadwritelock(boolean fair) {
        sync = fair ? new fairsync() : new nonfairsync();
        readerlock = new readlock(this);
        writerlock = new writelock(this);
    }

写锁

  获取锁

public void lock() {
            sync.acquire(1);
        }
//这里与reentrantlock相同
public final void acquire(int arg) {
        if (!tryacquire(arg) &&
            acquirequeued(addwaiter(node.exclusive), arg))
            selfinterrupt();
    }

protected final boolean tryacquire(int acquires) {
            thread current = thread.currentthread();
            int c = getstate();
            int w = exclusivecount(c);
            if (c != 0) {
                // (note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getexclusiveownerthread())
                    return false;
                if (w + exclusivecount(acquires) > max_count)
                    throw new error("maximum lock count exceeded");
                // reentrant acquire
                setstate(c + acquires);
                return true;
            }
            if (writershouldblock() ||
                !compareandsetstate(c, c + acquires))
                return false;
            setexclusiveownerthread(current);
            return true;
        }

  这里解析tryacquire()方法。

  • 获取当前线程
  • 获取状态
  • 获取写线程数
  • 若state不为0,表示锁已被持有。再判断,如果写线程数为0,则读锁被占用,返回false;如果写线程数不为0,且独占线程不是当前线程,表示写锁被其他线程占用没返回false
  • 如果写锁重入数大于最大值max_count,抛错
  • 写锁重入,返回true
  • state为0,根据公平锁还是非公平锁判断是否阻塞线程。不需要阻塞就cas更新state
  • 当前线程设为独占线程,获取写锁,返回true

  释放锁

public void unlock() {
            sync.release(1);
        }

public final boolean release(int arg) {
        if (tryrelease(arg)) {
            node h = head;
            if (h != null && h.waitstatus != 0)
                unparksuccessor(h);
            return true;
        }
        return false;
    }

protected final boolean tryrelease(int releases) {
            if (!isheldexclusively())
                throw new illegalmonitorstateexception();
            int nextc = getstate() - releases;
            boolean free = exclusivecount(nextc) == 0;
            if (free)
                setexclusiveownerthread(null);
            setstate(nextc);
            return free;
        }

  分析tryrelease()方法

  • 判断持有写锁的线程是否当前线程,不是则抛错
  • state减1
  • 以新state计算写锁数量,如果为0,表示完全释放;
  • 完全释放就设置独占线程为null
  • 如果独占线程数量不是0,还是更新state,这里就表示多次重入写锁后,释放了一次

读锁

  获取锁

public void lock() {
            sync.acquireshared(1);
        }

public final void acquireshared(int arg) {
        if (tryacquireshared(arg) < 0)
            doacquireshared(arg);
    }

protected final int tryacquireshared(int unused) {
            thread current = thread.currentthread();
            int c = getstate();
            if (exclusivecount(c) != 0 &&
                getexclusiveownerthread() != current)
                return -1;
            int r = sharedcount(c);
            if (!readershouldblock() &&
                r < max_count &&
                compareandsetstate(c, c + shared_unit)) {
                if (r == 0) {
                    firstreader = current;
                    firstreaderholdcount = 1;
                } else if (firstreader == current) {
                    firstreaderholdcount++;
                } else {
                    holdcounter rh = cachedholdcounter;
                    if (rh == null || rh.tid != getthreadid(current))
                        cachedholdcounter = rh = readholds.get();
                    else if (rh.count == 0)
                        readholds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fulltryacquireshared(current);
        }

  这里分析tryacquireshared()方法

  • 获取当前线程
  • 获取state
  • 如果写锁数量不为0,且独占线程不是本线程,获得读锁失败。因为写锁被其他线程占用
  • 获取读锁数量
  • 根据公平锁或者非公平锁判断是否应该被阻塞,判断读锁数量是否小于最大值max_count,再尝试cas更新state
  • 以上判断都通过且更新state也成功后,如果读锁为0,记录第一个读线程和此线程占用读锁数量
  • 如果第一个读线程是本线程,表示此时是读锁的重入,则把此线程占用读锁数量+1
  • 如果读锁数量不为0,且此线程也不是第一个读线程,则找到当前线程的计数器,并计数+1
  • 如果在阻塞判断,读锁数量判断和cas更新是否成功这部分没有通过,则进入fulltryacquireshared()方法,逻辑与上面的获取类似,以无限循环方式保证操作成功,不赘述。

释放锁

  

public void unlock() {
            sync.releaseshared(1);
        }
public final boolean releaseshared(int arg) {
        if (tryreleaseshared(arg)) {
            doreleaseshared();
            return true;
        }
        return false;
    }

protected final boolean tryreleaseshared(int unused) {
            thread current = thread.currentthread();
            if (firstreader == current) {
                // assert firstreaderholdcount > 0;
                if (firstreaderholdcount == 1)
                    firstreader = null;
                else
                    firstreaderholdcount--;
            } else {
                holdcounter rh = cachedholdcounter;
                if (rh == null || rh.tid != getthreadid(current))
                    rh = readholds.get();
                int count = rh.count;
                if (count <= 1) {
                    readholds.remove();
                    if (count <= 0)
                        throw unmatchedunlockexception();
                }
                --rh.count;
            }
            for (;;) {
                int c = getstate();
                int nextc = c - shared_unit;
                if (compareandsetstate(c, nextc))
                    // releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

  分析tryreleaseshared()方法

  • 获取当前线程
  • 如果当前线程是第一个读线程,则释放firstreader或者第一个读线程的锁计数-1
  • 不是就获得当前线程的计数器。根据计数选择删除此计数器或者减少计数
  • 无限循环更新state  

获取锁和释放锁的源码部分代码就分析放到这里,接下来用代码时间看看reentrantreadwritelock的使用效果测试。

public class readwritelocktest {
    private static reentrantreadwritelock readwritelock = new reentrantreadwritelock();
    private static executorservice executorservice = executors.newcachedthreadpool();
    //读操作
    public static void read(){
        try {
       //加读锁 readwritelock.readlock().lock(); system.out.println(thread.currentthread().getname() + " is reading " + system.currenttimemillis()); thread.sleep(1000); } catch (interruptedexception e){ }finally { readwritelock.readlock().unlock(); } } //写操作 public static void write() { try {
       //加写锁 readwritelock.writelock().lock(); system.out.println(thread.currentthread().getname() + " is writing "+ system.currenttimemillis()); thread.sleep(1000); } catch (interruptedexception e){ }finally { readwritelock.writelock().unlock(); } } public static void main(string[] args) { for (int i = 0; i < 3; i++) { executorservice.execute(new runnable() { @override public void run() { readwritelocktest.read(); } }); } for (int i = 0; i < 3; i++) { executorservice.execute(new runnable() { @override public void run() { readwritelocktest.write(); } }); } } }

  执行结果如下:

pool-1-thread-2 is reading 1549002279198
pool-1-thread-1 is reading 1549002279198
pool-1-thread-3 is reading 1549002279198
pool-1-thread-4 is writing 1549002280208
pool-1-thread-5 is writing 1549002281214
pool-1-thread-6 is writing 1549002282224

  可以看到,thread1,2,3在读时,是同时执行。thread4,5,6在写操作是,都差不多间隔1000毫秒。