欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java concurrency之共享锁和ReentrantReadWriteLock_动力节点Java学院整理

程序员文章站 2023-12-16 22:29:28
readwritelock 和 reentrantreadwritelock介绍 readwritelock,顾名思义,是读写锁。它维护了一对相关的锁 — — “读取...

readwritelock 和 reentrantreadwritelock介绍

readwritelock,顾名思义,是读写锁。它维护了一对相关的锁 — — “读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作。

“读取锁”用于只读操作,它是“共享锁”,能同时被多个线程获取。

“写入锁”用于写入操作,它是“独占锁”,写入锁只能被一个线程锁获取。

注意:不能同时存在读取锁和写入锁!

readwritelock是一个接口。reentrantreadwritelock是它的实现类,reentrantreadwritelock包括子类readlock和writelock。

readwritelock 和 reentrantreadwritelock函数列表

readwritelock函数列表

// 返回用于读取操作的锁。
lock readlock()
// 返回用于写入操作的锁。
lock writelock()

reentrantreadwritelock函数列表

// 创建一个新的 reentrantreadwritelock,默认是采用“非公平策略”。
reentrantreadwritelock()
// 创建一个新的 reentrantreadwritelock,fair是“公平策略”。fair为true,意味着公平策略;否则,意味着非公平策略。
reentrantreadwritelock(boolean fair)

// 返回当前拥有写入锁的线程,如果没有这样的线程,则返回 null。
protected thread getowner()
// 返回一个 collection,它包含可能正在等待获取读取锁的线程。
protected collection<thread> getqueuedreaderthreads()
// 返回一个 collection,它包含可能正在等待获取读取或写入锁的线程。
protected collection<thread> getqueuedthreads()
// 返回一个 collection,它包含可能正在等待获取写入锁的线程。
protected collection<thread> getqueuedwriterthreads()
// 返回等待获取读取或写入锁的线程估计数目。
int getqueuelength()
// 查询当前线程在此锁上保持的重入读取锁数量。
int getreadholdcount()
// 查询为此锁保持的读取锁数量。
int getreadlockcount()
// 返回一个 collection,它包含可能正在等待与写入锁相关的给定条件的那些线程。
protected collection<thread> getwaitingthreads(condition condition)
// 返回正等待与写入锁相关的给定条件的线程估计数目。
int getwaitqueuelength(condition condition)
// 查询当前线程在此锁上保持的重入写入锁数量。
int getwriteholdcount()
// 查询是否给定线程正在等待获取读取或写入锁。
boolean hasqueuedthread(thread thread)
// 查询是否所有的线程正在等待获取读取或写入锁。
boolean hasqueuedthreads()
// 查询是否有些线程正在等待与写入锁有关的给定条件。
boolean haswaiters(condition condition)
// 如果此锁将公平性设置为 ture,则返回 true。
boolean isfair()
// 查询是否某个线程保持了写入锁。
boolean iswritelocked()
// 查询当前线程是否保持了写入锁。
boolean iswritelockedbycurrentthread()
// 返回用于读取操作的锁。
reentrantreadwritelock.readlock readlock()
// 返回用于写入操作的锁。
reentrantreadwritelock.writelock writelock()

reentrantreadwritelock数据结构

reentrantreadwritelock的uml类图如下:

Java concurrency之共享锁和ReentrantReadWriteLock_动力节点Java学院整理

从中可以看出:

(01) reentrantreadwritelock实现了readwritelock接口。readwritelock是一个读写锁的接口,提供了"获取读锁的readlock()函数" 和 "获取写锁的writelock()函数"。

(02) reentrantreadwritelock中包含:sync对象,读锁readerlock和写锁writerlock。读锁readlock和写锁writelock都实现了lock接口。读锁readlock和写锁writelock中也都分别包含了"sync对象",它们的sync对象和reentrantreadwritelock的sync对象 是一样的,就是通过sync,读锁和写锁实现了对同一个对象的访问。

(03) 和"reentrantlock"一样,sync是sync类型;而且,sync也是一个继承于aqs的抽象类。sync也包括"公平锁"fairsync和"非公平锁"nonfairsync。sync对象是"fairsync"和"nonfairsync"中的一个,默认是"nonfairsync"。

其中,共享锁源码相关的代码如下:

public static class readlock implements lock, java.io.serializable {
  private static final long serialversionuid = -5992448646407690164l;
  // reentrantreadwritelock的aqs对象
  private final sync sync;

  protected readlock(reentrantreadwritelock lock) {
    sync = lock.sync;
  }

  // 获取“共享锁”
  public void lock() {
    sync.acquireshared(1);
  }

  // 如果线程是中断状态,则抛出一场,否则尝试获取共享锁。
  public void lockinterruptibly() throws interruptedexception {
    sync.acquiresharedinterruptibly(1);
  }

  // 尝试获取“共享锁”
  public boolean trylock() {
    return sync.tryreadlock();
  }

  // 在指定时间内,尝试获取“共享锁”
  public boolean trylock(long timeout, timeunit unit)
      throws interruptedexception {
    return sync.tryacquiresharednanos(1, unit.tonanos(timeout));
  }

  // 释放“共享锁”
  public void unlock() {
    sync.releaseshared(1);
  }

  // 新建条件
  public condition newcondition() {
    throw new unsupportedoperationexception();
  }

  public string tostring() {
    int r = sync.getreadlockcount();
    return super.tostring() +
      "[read locks = " + r + "]";
  }
}

说明:

readlock中的sync是一个sync对象,sync继承于aqs类,即sync就是一个锁。reentrantreadwritelock中也有一个sync对象,而且readlock中的sync和reentrantreadwritelock中的sync是对应关系。即reentrantreadwritelock和readlock共享同一个aqs对象,共享同一把锁。

reentrantreadwritelock中sync的定义如下:

final sync sync;

下面,分别从“获取共享锁”和“释放共享锁”两个方面对共享锁进行说明。

获取共享锁

获取共享锁的思想(即lock函数的步骤),是先通过tryacquireshared()尝试获取共享锁。尝试成功的话,则直接返回;尝试失败的话,则通过doacquireshared()不断的循环并尝试获取锁,若有需要,则阻塞等待。doacquireshared()在循环中每次尝试获取锁时,都是通过tryacquireshared()来进行尝试的。下面看看“获取共享锁”的详细流程。

1. lock()

lock()在readlock中,源码如下:

public void lock() {
  sync.acquireshared(1);
}

2. acquireshared()

sync继承于aqs,acquireshared()定义在aqs中。源码如下:

public final void acquireshared(int arg) {
  if (tryacquireshared(arg) < 0)
    doacquireshared(arg);
}

说明:acquireshared()首先会通过tryacquireshared()来尝试获取锁。

尝试成功的话,则不再做任何动作(因为已经成功获取到锁了)。

尝试失败的话,则通过doacquireshared()来获取锁。doacquireshared()会获取到锁了才返回。

3. tryacquireshared()

tryacquireshared()定义在reentrantreadwritelock.java的sync中,源码如下:

protected final int tryacquireshared(int unused) {
  thread current = thread.currentthread();
  // 获取“锁”的状态
  int c = getstate();
  // 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。
  if (exclusivecount(c) != 0 &&
    getexclusiveownerthread() != current)
    return -1;
  // 获取“读取锁”的共享计数
  int r = sharedcount(c);
  // 如果“不需要阻塞等待”,并且“读取锁”的共享计数小于max_count;
  // 则通过cas函数更新“锁的状态”,将“读取锁”的共享计数+1。
  if (!readershouldblock() &&
    r < max_count &&
    compareandsetstate(c, c + shared_unit)) {
    // 第1次获取“读取锁”。
    if (r == 0) { 
      firstreader = current;
      firstreaderholdcount = 1;
    // 如果想要获取锁的线程(current)是第1个获取锁(firstreader)的线程
    } else if (firstreader == current) { 
      firstreaderholdcount++;
    } else {
      // holdcounter是用来统计该线程获取“读取锁”的次数。
      holdcounter rh = cachedholdcounter;
      if (rh == null || rh.tid != current.getid())
        cachedholdcounter = rh = readholds.get();
      else if (rh.count == 0)
        readholds.set(rh);
      // 将该线程获取“读取锁”的次数+1。
      rh.count++;
    }
    return 1;
  }
  return fulltryacquireshared(current);
}

说明:tryacquireshared()的作用是尝试获取“共享锁”。

如果在尝试获取锁时,“不需要阻塞等待”并且“读取锁的共享计数小于max_count”,则直接通过cas函数更新“读取锁的共享计数”,以及将“当前线程获取读取锁的次数+1”。否则,通过fulltryacquireshared()获取读取锁。

4. fulltryacquireshared()

fulltryacquireshared()在reentrantreadwritelock中定义,源码如下:

final int fulltryacquireshared(thread current) {
  holdcounter rh = null;
  for (;;) {
    // 获取“锁”的状态
    int c = getstate();
    // 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。
    if (exclusivecount(c) != 0) {
      if (getexclusiveownerthread() != current)
        return -1;
    // 如果“需要阻塞等待”。
    // (01) 当“需要阻塞等待”的线程是第1个获取锁的线程的话,则继续往下执行。
    // (02) 当“需要阻塞等待”的线程获取锁的次数=0时,则返回-1。
    } else if (readershouldblock()) {
      // 如果想要获取锁的线程(current)是第1个获取锁(firstreader)的线程
      if (firstreader == current) {
      } else {
        if (rh == null) {
          rh = cachedholdcounter;
          if (rh == null || rh.tid != current.getid()) {
            rh = readholds.get();
            if (rh.count == 0)
              readholds.remove();
          }
        }
        // 如果当前线程获取锁的计数=0,则返回-1。
        if (rh.count == 0)
          return -1;
      }
    }
    // 如果“不需要阻塞等待”,则获取“读取锁”的共享统计数;
    // 如果共享统计数超过max_count,则抛出异常。
    if (sharedcount(c) == max_count)
      throw new error("maximum lock count exceeded");
    // 将线程获取“读取锁”的次数+1。
    if (compareandsetstate(c, c + shared_unit)) {
      // 如果是第1次获取“读取锁”,则更新firstreader和firstreaderholdcount。
      if (sharedcount(c) == 0) {
        firstreader = current;
        firstreaderholdcount = 1;
      // 如果想要获取锁的线程(current)是第1个获取锁(firstreader)的线程,
      // 则将firstreaderholdcount+1。
      } else if (firstreader == current) {
        firstreaderholdcount++;
      } else {
        if (rh == null)
          rh = cachedholdcounter;
        if (rh == null || rh.tid != current.getid())
          rh = readholds.get();
        else if (rh.count == 0)
          readholds.set(rh);
        // 更新线程的获取“读取锁”的共享计数
        rh.count++;
        cachedholdcounter = rh; // cache for release
      }
      return 1;
    }
  }
}

说明:fulltryacquireshared()会根据“是否需要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过cas尝试获取锁,并返回1。

5. doacquireshared()

doacquireshared()定义在aqs函数中,源码如下:

private void doacquireshared(int arg) {
  // addwaiter(node.shared)的作用是,创建“当前线程”对应的节点,并将该线程添加到clh队列中。
  final node node = addwaiter(node.shared);
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      // 获取“node”的前一节点
      final node p = node.predecessor();
      // 如果“当前线程”是clh队列的表头,则尝试获取共享锁。
      if (p == head) {
        int r = tryacquireshared(arg);
        if (r >= 0) {
          setheadandpropagate(node, r);
          p.next = null; // help gc
          if (interrupted)
            selfinterrupt();
          failed = false;
          return;
        }
      }
      // 如果“当前线程”不是clh队列的表头,则通过shouldparkafterfailedacquire()判断是否需要等待,
      // 需要的话,则通过parkandcheckinterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。
      if (shouldparkafterfailedacquire(p, node) &&
        parkandcheckinterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelacquire(node);
  }
}

说明:doacquireshared()的作用是获取共享锁。

它会首先创建线程对应的clh队列的节点,然后将该节点添加到clh队列中。clh队列是管理获取锁的等待线程的队列。
如果“当前线程”是clh队列的表头,则尝试获取共享锁;否则,则需要通过shouldparkafterfailedacquire()判断是否阻塞等待,需要的话,则通过parkandcheckinterrupt()进行阻塞等待。

doacquireshared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doacquireshared()在每一次尝试获取锁时,是通过tryacquireshared()来执行的!

释放共享锁

释放共享锁的思想,是先通过tryreleaseshared()尝试释放共享锁。尝试成功的话,则通过doreleaseshared()唤醒“其他等待获取共享锁的线程”,并返回true;否则的话,返回flase。

1. unlock()

public void unlock() {
  sync.releaseshared(1);
}

说明:该函数实际上调用releaseshared(1)释放共享锁。

2. releaseshared()

releaseshared()在aqs中实现,源码如下:

public final boolean releaseshared(int arg) {
  if (tryreleaseshared(arg)) {
    doreleaseshared();
    return true;
  }
  return false;
}

说明:releaseshared()的目的是让当前线程释放它所持有的共享锁。

它首先会通过tryreleaseshared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doreleaseshared()去释放共享锁。

3. tryreleaseshared()

tryreleaseshared()定义在reentrantreadwritelock中,源码如下:

protected final boolean tryreleaseshared(int unused) {
  // 获取当前线程,即释放共享锁的线程。
  thread current = thread.currentthread();
  // 如果想要释放锁的线程(current)是第1个获取锁(firstreader)的线程,
  // 并且“第1个获取锁的线程获取锁的次数”=1,则设置firstreader为null;
  // 否则,将“第1个获取锁的线程的获取次数”-1。
  if (firstreader == current) {
    // assert firstreaderholdcount > 0;
    if (firstreaderholdcount == 1)
      firstreader = null;
    else
      firstreaderholdcount--;
  // 获取rh对象,并更新“当前线程获取锁的信息”。
  } else {
 
    holdcounter rh = cachedholdcounter;
    if (rh == null || rh.tid != current.getid())
      rh = readholds.get();
    int count = rh.count;
    if (count <= 1) {
      readholds.remove();
      if (count <= 0)
        throw unmatchedunlockexception();
    }
    --rh.count;
  }
  for (;;) {
    // 获取锁的状态
    int c = getstate();
    // 将锁的获取次数-1。
    int nextc = c - shared_unit;
    // 通过cas更新锁的状态。
    if (compareandsetstate(c, nextc))
      return nextc == 0;
  }
}

说明:tryreleaseshared()的作用是尝试释放共享锁。

4. doreleaseshared()

doreleaseshared()定义在aqs中,源码如下:

private void doreleaseshared() {
  for (;;) {
    // 获取clh队列的头节点
    node h = head;
    // 如果头节点不为null,并且头节点不等于tail节点。
    if (h != null && h != tail) {
      // 获取头节点对应的线程的状态
      int ws = h.waitstatus;
      // 如果头节点对应的线程是signal状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
      if (ws == node.signal) {
        // 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
        if (!compareandsetwaitstatus(h, node.signal, 0))
          continue;
        // 唤醒“头节点的下一个节点所对应的线程”。
        unparksuccessor(h);
      }
      // 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
      else if (ws == 0 &&
           !compareandsetwaitstatus(h, 0, node.propagate))
        continue;        // loop on failed cas
    }
    // 如果头节点发生变化,则继续循环。否则,退出循环。
    if (h == head)          // loop if head changed
      break;
  }
}

说明:doreleaseshared()会释放“共享锁”。它会从前往后的遍历clh队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的锁。

公平共享锁和非公平共享锁

和互斥锁reentrantlock一样,readlock也分为公平锁和非公平锁。

公平锁和非公平锁的区别,体现在判断是否需要阻塞的函数readershouldblock()是不同的。

公平锁的readershouldblock()的源码如下:

final boolean readershouldblock() {
  return hasqueuedpredecessors();
}

在公平共享锁中,如果在当前线程的前面有其他线程在等待获取共享锁,则返回true;否则,返回false。

非公平锁的readershouldblock()的源码如下:

final boolean readershouldblock() {
  return apparentlyfirstqueuedisexclusive();
}

在非公平共享锁中,它会无视当前线程的前面是否有其他线程在等待获取共享锁。只要该非公平共享锁对应的线程不为null,则返回true。

reentrantreadwritelock示例

 import java.util.concurrent.locks.readwritelock; 
 import java.util.concurrent.locks.reentrantreadwritelock; 
 
 public class readwritelocktest1 { 
 
   public static void main(string[] args) { 
     // 创建账户
     mycount mycount = new mycount("4238920615242830", 10000); 
     // 创建用户,并指定账户
     user user = new user("tommy", mycount); 
 
     // 分别启动3个“读取账户金钱”的线程 和 3个“设置账户金钱”的线程
     for (int i=0; i<3; i++) {
       user.getcash();
       user.setcash((i+1)*1000);
     }
   } 
 } 
 
 class user {
   private string name;      //用户名 
   private mycount mycount;    //所要操作的账户 
   private readwritelock mylock;  //执行操作所需的锁对象 
 
   user(string name, mycount mycount) {
     this.name = name; 
     this.mycount = mycount; 
     this.mylock = new reentrantreadwritelock();
   }
 
   public void getcash() {
     new thread() {
       public void run() {
         mylock.readlock().lock(); 
         try {
           system.out.println(thread.currentthread().getname() +" getcash start"); 
           mycount.getcash();
           thread.sleep(1);
           system.out.println(thread.currentthread().getname() +" getcash end"); 
         } catch (interruptedexception e) {
         } finally {
           mylock.readlock().unlock(); 
         }
       }
     }.start();
   }
 
   public void setcash(final int cash) {
     new thread() {
       public void run() {
         mylock.writelock().lock(); 
         try {
           system.out.println(thread.currentthread().getname() +" setcash start"); 
           mycount.setcash(cash);
           thread.sleep(1);
           system.out.println(thread.currentthread().getname() +" setcash end"); 
         } catch (interruptedexception e) {
         } finally {
           mylock.writelock().unlock(); 
         }
       }
     }.start();
   }
 }
 
 class mycount {
   private string id;     //账号 
   private int  cash;    //账户余额 
 
   mycount(string id, int cash) { 
     this.id = id; 
     this.cash = cash; 
   } 
 
   public string getid() { 
     return id; 
   } 
 
   public void setid(string id) { 
     this.id = id; 
   } 
 
   public int getcash() { 
     system.out.println(thread.currentthread().getname() +" getcash cash="+ cash); 
     return cash; 
   } 
 
   public void setcash(int cash) { 
     system.out.println(thread.currentthread().getname() +" setcash cash="+ cash); 
     this.cash = cash; 
   } 
 }

运行结果:

thread-0 getcash start
thread-2 getcash start
thread-0 getcash cash=10000
thread-2 getcash cash=10000
thread-0 getcash end
thread-2 getcash end
thread-1 setcash start
thread-1 setcash cash=1000
thread-1 setcash end
thread-3 setcash start
thread-3 setcash cash=2000
thread-3 setcash end
thread-4 getcash start
thread-4 getcash cash=2000
thread-4 getcash end
thread-5 setcash start
thread-5 setcash cash=3000
thread-5 setcash end

结果说明:

(01) 观察thread0和thread-2的运行结果,我们发现,thread-0启动并获取到“读取锁”,在它还没运行完毕的时候,thread-2也启动了并且也成功获取到“读取锁”。

因此,“读取锁”支持被多个线程同时获取。

(02) 观察thread-1,thread-3,thread-5这三个“写入锁”的线程。只要“写入锁”被某线程获取,则该线程运行完毕了,才释放该锁。

因此,“写入锁”不支持被多个线程同时获取。

上一篇:

下一篇: