欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java并发编程艺术之Java中的锁

程序员文章站 2022-05-04 21:41:33
...

Java并发编程艺术之Java中的锁

    本文章主要介绍Java并发包中与锁相关的API和组件,会从1)使用 、2)实现 两个方面进行介绍 ,下面是主要包含的内容:

  • Lock接口
  • 队列同步器(AQS)
  • 重入锁
  • 读写锁
  • LockSupport工具(暂未整理,后期补充)
  • Condition接口

一、Lock接口

    锁的简单介绍: 锁可以控制多个线程访问共享资源的方式,可以防止多个线程同时访问共享资源,
    锁的实现方式: 在不同JDK版本,实现锁的方式不同

  • JDK5 之前: 通过synchronized 关键字实现锁的功能
  • JDK5 之后: 增加并发包(java.util.concurrent)中Lock来实现锁功能(synchronized依然可以实现锁的功能)

    synchronized和Lock差异性

  • synchronized可以隐式的获取和释放锁(简化了同步管理、扩展性比较差), 而Lock需要显示的获取和释放锁
  • synchronized不具有中断获取锁、超时获取锁的功能等同步特性,Lock具有锁释放/获取的可操作性, 具有可中断获取锁,具有超时获取锁等同步特性

下面是Lock提供的synchronized关键字不具备的特性:

特性 描述
尝试非阻塞的获取锁 当前线程尝试获取锁,如果这一时刻锁没有被其它线程获取到,则成功获取并持有锁
能被中断的获取锁 与synchronized不同,获取到锁的线程可以响应中断,当获取到锁的线程响应中断时,会抛出中断异常,同时释放锁
超时获取锁 在指定的截至时间之前获取锁,如果截止时间之前任然没有获取到锁,则返回

    Lock API
Java并发编程艺术之Java中的锁

图1、Lock API

二、队列同步器(AQS)

    队列同步器(AbstractQueuedSynchronizer), 是用来构建或者其它同步组件的基础框架; 同步器的使用方式是继承,在抽象方法的实现过程中会通过getState()、setState(int newState)、compareAndSetState(int expect, int updateState)方法对同步状态进行修改 。

注:继承同步器(AQS)的子类推荐为同步组件的静态内部类,同步器自身没有实现任何同步接口,仅仅是定义了同步状态获取和释放的若干方法,以便供自定义同步组件使用,同步器即支持独占式的获取同步状态,也支持共享式的获取状态, 比如: ReentrantLock、ReentrantReadWriteLock、CountDownLatch。

    同步器(AQS)、自定义组件、自定义锁三者之间的关系 如下图:
Java并发编程艺术之Java中的锁

图2、同步器(AQS)、同步组件、自定义锁之间的关系

代码实现示例如下:

public class ReentrantLock implements Lock, java.io.Serializable { // 自定义锁, 此示例代码是重入锁   
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer { //同步组件, 其中Sync的父类 AbstractQueuedSynchronizer 是队列同步器
        private static final long serialVersionUID = -5179523762034025860L;

        abstract void lock();

        final boolean nonfairTryAcquire(int acquires) {
            // ...nonfairTryAcquire 的实现逻辑
        }

        protected final boolean tryRelease(int releases) {
            // ... tryRelease 的实现逻辑
        }

        protected final boolean isHeldExclusively() {
            // ... isHeldExclusively 的实现逻辑
        }

        final ConditionObject newCondition() {
            // ... newCondition 的实现逻辑
        }
    }
}
  • 锁是面向实现者的,它定义了使用者和锁交互的接口,隐藏了实现的细节
  • 同步器是面向锁的实现者的,它简化了锁的实现方式,屏蔽了同步状态管理、线程排队、等待/唤醒 等底层操作 。

1. 队列同步器的接口与示例
    同步器是通过模板方法来设计的,因此使用者需要继承同步器,并重写指定的方法,随后将同步器组合在同步组件实现中,最后调用使用者重写的模板方法

下面是同步器可重写的方法和同步器提供的模板方法
Java并发编程艺术之Java中的锁

图3、同步器可重写的方法

Java并发编程艺术之Java中的锁

图4、同步器提供的模板方法

针对图4同步器提供的模板方法可以分为3类

  • 独占式获取和释放同步状态
  • 共享式获取和释放同步状态
  • 查询同步队列中等待线程的状况

这里以独占锁的示例初步了解同步器的工作原理
独占锁: 同一个时刻只有一个线程获取到锁, 而其它获取锁的线程只能处于同步队列中等待。
下面是代码示例:

public class MutexThread {
    //2. 将需要的操作代理到Sync上
    private final Sync sync = new Sync();

    public void lock(){
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    public boolean unlock() {
        return sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked(){
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads() ;
    }

    public void lockInterrupterly() throws InterruptedException{
        sync.acquireInterruptibly(1);
    }

    //1. 定义自定义组件, 静态内部类
    private static class Sync extends AbstractQueuedSynchronizer {
        // 同步器是否被当前线程独占
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1 ;
        }
        //当state状态为0的时候获取锁
        @Override
        public boolean tryAcquire(int acquries) {
            if(compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false ;
        }
        //释放锁, 将状态设置为0
        @Override
        protected boolean tryRelease(int release) {
            if(getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }
}

针对上面示例代码做简单介绍:

  • MutexThread是一个自定义独占锁, 同一时刻只允许一个线程占有锁
  • Sync是静态内部类, 它继承了队列同步器(AbstractQueuedSynchronizer), 该内部类实现了独占式获取和释放同步状态
  • tryAcquire(int acquires)方法中,如果经过CAS设置成功,会将同步状态设置为1 , 表示获取同步状态成功了
  • tryRelease(int releases)方法中,只是将同步状态设置为0,表示释放同步状态成功,如果之前已经释放(getState() == 0)会抛出非法监视状态异常(IllegalMonitorStateException)
  • 使用者并不会直接和内部的同步器实现交互,而是通过MutexThread提供的方法,

2. 队列同步器的实现分析
    本小节包含的内容如下:

  • 同步队列
  • 独占式同步状态获取与释放
  • 共享式同步状态获取与释放
  • 超时获取同步状态

1) 同步队列
    同步队列是一个FIFO的双向队列,
1. 当线程获取同步状态资源失败时, 同步器(AQS)会将当前线程构造成一个Node, 并加入同步队列中,
2. 处于同步队列中的线程处于阻塞状态, 在同步状态释放时,会唤醒首节点中的线程,使其再次尝试获取同步状态

    阻塞队列中添加的节点(Node)信息定义如下:

static final class Node {

        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;

        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

从节点Node定义可知包含如下信息:

  • 获取同步状态失败线程的引用(volatile Thread thread ;)
  • 等待状态(volatile int waitStatus ;)
  • 当前线程的前驱节点(volatile Node prev) 和后继节点(volatile Node next)

Java并发编程艺术之Java中的锁

图5、节点的属性信息及描述

    关于同步队列添加节点/释放节点的操作,可以通过下面的图示进一步了解:
Java并发编程艺术之Java中的锁

图6、同步队列添加节点、释放节点

    针对上面的流程图做进一步说明:

  • 同步器中包含了两个节点类型的引用(pred, next), 他们分别指向头节点/尾节点
  • 当一个线程成功的获取到了同步状态,其它无法获取同步状态的线程将会被构造成节点Node,添加至同步队列中
  • 多线程环境下,为了保证节点添加至同步队列是线程安全的,同步器提供了一个基于CAS设置节点的方法compareAndSetTail(Node expect, Node update) , 只有expect节点与实际尾节点一致时才可以更新尾节点信息为update节点
  • 同步队列尊享FIFO规则,首节点是获取同步状态成功的状态,当该首节点释放同步状态时,会唤醒后继节点,被唤醒的后继节点会尝试获取同步状态,如果获取成功会将自己设置为新的首节点

2) 独占式同步状态获取与释放
    2.1) 获取: 这里以acquire(int arg)来说明同步器获取同步状态的流程, 下面时acquire()方法的代码:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

针对上面的代码做一些说明:

  • 上面的代码完成了同步状态的获取、节点构造、加入同步队列、以及在同步队列中自旋等待的过程
  • 首先通过tryAcquire(arg)尝试获取同步状态,如果没有获取到(返回false), 会创建独占式节点Node(Node.EXCLUSIVE)
  • 然后将新创建的节点Node, 添加(通过addWaiter方法)至同步队列的尾部
private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;     // 获取源同步队列的尾节点
     if (pred != null) {   
         node.prev = pred;  //如果源队列不为空, 让新添加的节点的前驱节点指向源队列的尾节点
         if (compareAndSetTail(pred, node)) {  // 通过CAS设置新的尾节点
             pred.next = node;    // 设置源尾节点的后继节点信息为新的尾节点node
             return node;
         }
     }
     enq(node);   // 如果源同步队列为空,执行enq方法
     return node;
 }


private Node enq(final Node node) {
     for (;;) {   //以死循环的方式添加节点node至同步队列尾部
         Node t = tail;    // 获取源同步队列的尾节点
         if (t == null) { // 源同步队列尾空,需要初始化
             if (compareAndSetHead(new Node()))  // 通过CAS原理设置head节点为Node
                 tail = head;  // 让尾节(tail)点指向head结点
         } else {
             node.prev = t;   // 将新添加的节点的前驱结点设置为源尾节点 
             if (compareAndSetTail(t, node)) {  // 设置新的尾节点
                 t.next = node;  // 让源尾节点的后继节点为新添加的节点node
                 return t;
             }
         }
     }
 }
  • 最后通过acquireQueued(final Node node, int arg)方法获取同步状态,如果获取不到同步状态,则阻塞节点中的线程,其通过前驱节点出队来唤醒
        关于只有前驱节点为头节点才能尝试获取同步状态的原因如下:

    • 头节点是成功获取到同步状态的节点,头节点释放同步状态后会唤醒其后继节点,此时被唤醒的节点会检测其前驱节点是不是头节点
    • 维护同步队列的FIFO原则
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;   //设置中断状态为false
        for (;;) {
            final Node p = node.predecessor();  //获取当前节点node的前驱节点 p
            if (p == head && tryAcquire(arg)) { // 如果当前节点的前驱节点是头节点, 尝试获取同步状态
                setHead(node);  // 当前节点获取到同步状态, 设置当前节点为新的头节点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) //更新节点的waitStatus信息
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);  //取消同步状态的获取,释放相关资源
    }
}

通过下面的流程图梳理前面的内容:
Java并发编程艺术之Java中的锁

图7、独占式同步状态获取流程

    2.2) 释放
下面是相关代码:

public final boolean release(int arg) {
    if (tryRelease(arg)) {  //释放资源
        Node h = head;  
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//如果h节点不为null, 且waitStatus状态不为0, 执行unparkSuccessor方法
        return true;
    }
    return false;
}


private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0); // 设置waitStatus值为0
    Node s = node.next;
    if (s == null || s.waitStatus > 0) { //节点已经被取消,需要出队列
        s = null;
        //从后向前找到最靠前的合法节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;   // 出队后重写设置前驱、后继节点
    }
    if (s != null)
        LockSupport.unpark(s.thread); // 通过LockSupport.unpart()唤醒处于等待状态的线程
}

总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点

3)共享式同步状态获取与释放
    共享式获取同步状态与独占式获取同步状态的区别:

  • 独占式: 同一时刻只能一个线程获取同步状态
  • 共享式: 统一时刻可以多个线程获取同步状态

下面式查一下图示:
Java并发编程艺术之Java中的锁

图8、独占式和共享式获取同步状态的区别

共享式获取同步状态的代码清单如下:

public final void acquireShared(int arg) {
    /**
    * tryAcquireShared(int arg) 尝试获取同步状态,返回值为int型
    * 如果 result >= 0 表示能够获取到同步状态
    * 如果 result <  0 表示需要通过自旋的方式获取同步状态 
    */
    if (tryAcquireShared(arg) < 0) 
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) { // 共享式获取的自旋过程
     final Node node = addWaiter(Node.SHARED); // 设置添加的结点是共享式结点, 并通过addWaiter加入同步队列尾部
     boolean failed = true;
     try {
         boolean interrupted = false;
         for (;;) {
             final Node p = node.predecessor(); //获取node结点的前驱结点
             if (p == head) { // 如果p是头节点
                 int r = tryAcquireShared(arg); // 尝试获取同步状态
                 // 成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)方法返回值大于等于0 (r >= 0)
                 if (r >= 0) {                  
                     setHeadAndPropagate(node, r);  //设置头节点和propagate信息, propagate = r
                     p.next = null; // help GC
                     if (interrupted)
                         selfInterrupt();
                     failed = false;
                     return;
                 }
             }
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt()) // 设置失败获取同步状态之后处理方式
                 interrupted = true;
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
 }

doAcquireShared(int arg) 方法在自旋的过程中,如果当前结点的前驱结点是头节点(head), 尝试获取同步状态,如果返回值 >= 0 , 表示此次获取同步状态成功,并从自旋过程中退出

    与独占式一样,共享式获取同步状态也需要释放同步状态,下面式相关的代码清单:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 释放共享式同步状态
        doReleaseShared();  // 实际执行释放操作的方法
        return true;
    }
    return false;
}


private void doReleaseShared() {
     for (;;) {
         Node h = head;
         if (h != null && h != tail) { // 既不是空同步队列,也不是刚初始化的队列  (tail = head = new Node() )
             int ws = h.waitStatus; // 获取头节点的waitStatus状态信息
             if (ws == Node.SIGNAL) {  // SIGNAL=-1, 后继结点处于等待状态
                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通过CAS设置新的waitStatus 为0, 表示初始状态
                     continue;            // loop to recheck cases
                 unparkSuccessor(h); // 通知后继结点
             }
             else if (ws == 0 &&
                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // waitStatus = 0 是初始结点, 设置新的waitStatus=-3,表示下次共享式获取同步状态可以无限继续下去
                 continue;                // loop on failed CAS
         }
         if (h == head)                   // loop if head changed
             break;
     }
 }

针对上面的代码: 释放同步状态之后,将会唤醒后继处于等待状态的结点,对于能够支持多个线程通知访问的组件,他和独占式主要区别在于tryReleaseShared(int arg)必须确保同步状态线程安全释放, 这里是通过无限循环和CAS来保证的

4、独占式超时获取同步状态
    通过调用同步器(AQS)的doAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果成功获取同步状态则返回true, 否则返回false, 该方法提供了synchronized所不具有的特性(超时获取)
    在介绍超时获取同步状态之前先介绍一下中断式获取同步状态

  • JDK5之前,当一个线程获取不到锁而被阻塞在synchronized之外时,如果对该线程进行中断操作,该线程的中断标志位会被修改,但是线程依旧会阻塞在synchronized上,等待获取锁
  • JDK5之后(含),同步器提供了acquireInterruptibly(int arg)方法,在等待获取同步状态时,如果当前线程被中断,会立即返回,并抛出InterruptedException异常

    超时获取同步状态被视为中断获取同步状态的“增强版” , 它不仅提供了响应式中断,并且在中断的基础上,增加了超时获取的特性
下面时代码清单:

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
     if (nanosTimeout <= 0L)
         return false;
     final long deadline = System.nanoTime() + nanosTimeout; //计算超时时间
     final Node node = addWaiter(Node.EXCLUSIVE);
     boolean failed = true;
     try {
         for (;;) {
             final Node p = node.predecessor();
             if (p == head && tryAcquire(arg)) {
                 setHead(node);
                 p.next = null; // help GC
                 failed = false;
                 return true;
             }
             nanosTimeout = deadline - System.nanoTime(); // 用超时时间 - 当前时间, 如果 <= 0 表示已经超时
             if (nanosTimeout <= 0L)
                 return false;
             if (shouldParkAfterFailedAcquire(p, node) &&
                 nanosTimeout > spinForTimeoutThreshold)
                 LockSupport.parkNanos(this, nanosTimeout);
             if (Thread.interrupted())
                 throw new InterruptedException();
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
 }

针对上面的代码, 方法在自旋的过程中, 如果当前结点的前驱结点为头节点(head),尝试获取同步状态,
1. 如果获取成功,设置相关信息,并返回
2. 如果 获取同步状态失败,判断是否已经超时
  2.1 如果还未超时,重新计算时间间隔,使当前线程继续等待
  2.2 如果已经到超时时间, 该线程会从LockSupport.parkNanos(Object
blocker,long nanos)方法返回

注意: 如果nanosTimeout小于等spinForTimeoutThreshold(1000纳秒)时,将不会使该线程进行超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。因此,在超时非常短的场景下,同步器会进入无条件的快速自旋

可以通过下面的流程图进一步了解独占式超时获取同步状态的流程:
Java并发编程艺术之Java中的锁

图9、独占式超时获取同步状态的流程

针对上面的流程图, 独占式获取同步状态和独占式超时获取同步状态的区别如下:
未获取到同步状态时:

  • 独占式一直处于等待状态
  • 独占式超时会使当前线程等待nanosTimeout纳秒,如果超时之前还未获取同步状态,会从等待逻辑中返回

三、重入锁(ReentrantLock)

     重入锁(ReentrantLock), 是指支持重进入的锁, 它表示一个线程对资源重复加锁,
    1) synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,方法在执行的过程中,执行线程可以连续多次的获取锁
    2)ReentrantLock没有synchronized关键字一样隐式重进入功能,但是在调用lock方法时,已经获取锁的线程,再次调用lock方法时能够获取锁而不被阻塞。

1) 实现重进入

    为了满足重进入的功能,即已经获取到锁的线程再次获取锁时不被阻塞 , 需要满足下面的条件

  • 线程再次获取锁。需要能够识别获取锁的线程是否当前已经占据锁的线程,如果是,则成功获取
  • 锁的最终释放。(1) 线程重复N次的获取锁,随后在第N次释放锁之后,其它线程能够获取到该锁。(2) 需要一个计数变量来表示锁获取与释放的状态,当计数值 > 0 时,表示锁重复获取次数, 当 计数值 = 0 时, 表示锁已经成功释放

下面以非公平实现说明,代码清单如下:

final boolean nonfairTryAcquire(int acquires) {
     final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {// 如果同步状态 = 0
          if (compareAndSetState(0, acquires)) { //通过CAS设置state值为acquires
              setExclusiveOwnerThread(current); //设置独占线程为当前线程
              return true;
          }
      }
      else if (current == getExclusiveOwnerThread()) { //c != 0 , 且获取锁的线程与当前占有锁的线程一致
          int nextc = c + acquires;  //当前占有锁的线程数量
          if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
          setState(nextc); // 设置state值为nextc
          return true;
      }
      return false;
  }

    针对上面的代码: 通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是当前占有锁的线程再次获取锁,则将同步状态值进行增加并返回true,表示获取同步状态成功。
    因为成功获取锁的线程再次获取锁,只是增加了同步状态值,ReentrantLock在释放同步状态时会减少同步状态值 ,释放同步状态的代码清单如下:

protected final boolean tryRelease(int releases) {
     int c = getState() - releases;  // 减少已经获取的同步状态数量
     if (Thread.currentThread() != getExclusiveOwnerThread()) // 获取同步状态的锁与当前占有锁的线程不一样 , 抛出异常
         throw new IllegalMonitorStateException();
     boolean free = false;
     if (c == 0) {
         free = true;
         setExclusiveOwnerThread(null); // 如果获取同步状态的线程完全释放, 设置释放状态置为true, 且独占式占有的线程置为null
     }
     setState(c); // 设置新的同步状态值
     return free;
 }

    如果该锁被获取了N次,那么前(n-1)次tryRelease(int release)方法必须返回false, 而只有同步状态完全释放了,才能返回true, 这里将同步状态是否为0(status = 0)作为最终释放的条件,当同步状态为0时,将占有线程设置为null, 并返回true,表示释放成功。

2) 公平与非公平获取锁的区别

    如果锁的获取是公平的,那么获取锁的顺序符合请求的绝对时间顺序,也就是FIFO,公平锁的代码清单如下:

protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
          if (!hasQueuedPredecessors() &&
              compareAndSetState(0, acquires)) {// 如果没有前驱结点(false),设置同步状态 = acquires,如果存在前驱结点(true)跳出判断
              setExclusiveOwnerThread(current); // 设置独占式获取同步状态的线程为当前线程
              return true;
          }
      }
      else if (current == getExclusiveOwnerThread()) { // c > 0 ,且获取同步状态的线程与当前占有锁的线程相同,执行重进入操作
          int nextc = c + acquires;
          if (nextc < 0) // 整型数值溢出
              throw new Error("Maximum lock count exceeded");
          setState(nextc);
          return true;
      }
      return false;
  }

    对比公平与非公平获取锁的方式, 唯一不同的对方在于公平方式判断条件多了前驱结点是否存在(hasQueuedPredecessors())的判断, 如果hasQueuedPredecessors() 返回true, 则表示有线程更早的获取了锁,需要在同步队列中等待前驱结点释放锁之后才能继续获取锁 。
    为了进一步说明公平锁与非公平锁之间的区别,通过下面示例代码进行举例说明:

public class FairAndUnfairThreadTest {
    private static Lock fairLock = new ReentrantLock2(true);
    private static Lock unfairLock = new ReentrantLock2(false);
    @Test
    public void fair() {
        testLock(fairLock);
    }
    @Test
    public void unfair() {
        testLock(unfairLock);
    }

    private void testLock(Lock lock) {
        //创建并启动5个线程
        for(int i = 0 ; i < 5 ; i++) {
            Job job = new Job(lock);
            job.start();
        }
    }

    static class Job extends Thread {
        private Lock lock  ;

        public Job(Lock lock) {
            this.lock = lock ;
        }

        @Override
        public void run() {
            //输出两次信息
            //第一次输出
            lock.lock();
            try {
                System.out.println(
                        "Locked by " + this.getName() + " , Waiting by " + getThreadNameList(lock));
            } finally {
                lock.unlock();
            }
            //第二次输出
            lock.lock();
            try {
                System.out.println(
                        "Locked by " + this.getName() + " , Waiting by " + getThreadNameList(lock));
            } finally {
                lock.unlock();
            }
        } 

        private List<String> getThreadNameList(Lock lock) {
            List<String> result = new ArrayList<>();

            ReentrantLock2 rtl = (ReentrantLock2) lock ;
            Collection<Thread> threads = rtl.getQueuedThreads();
            for(Thread t : threads) {
                result.add(t.getName());
            }

            return result;
        }

    }

    static class ReentrantLock2 extends ReentrantLock {
        private static final long serialVersionUID = 3336608472338842216L;

        public ReentrantLock2(boolean fair) {
            super(fair);
        }

        public Collection<Thread> getQueuedThreads() {
            List<Thread> arrayList = new ArrayList<>(super.getQueuedThreads());
            Collections.reverse(arrayList);

            return arrayList;
        }
    }

    public static void main(String[] args) {
        FairAndUnfairThreadTest test = new FairAndUnfairThreadTest() ;
        //1. 直接调用方法进行测试
        //test.fair();
        test.unfair();

        //2. 也可以使用@Test注解进行测试, 但是没有main方法输出的结果明显
    }
}

上面代码的输出结果如下

1. Fair输出结果

Locked by Thread-0 , Waiting by [Thread-1, Thread-2, Thread-3, Thread-4]
Locked by Thread-1 , Waiting by [Thread-2, Thread-3, Thread-4, Thread-0]
Locked by Thread-2 , Waiting by [Thread-3, Thread-4, Thread-0, Thread-1]
Locked by Thread-3 , Waiting by [Thread-4, Thread-0, Thread-1, Thread-2]
Locked by Thread-4 , Waiting by [Thread-0, Thread-1, Thread-2, Thread-3]
Locked by Thread-0 , Waiting by [Thread-1, Thread-2, Thread-3, Thread-4]
Locked by Thread-1 , Waiting by [Thread-2, Thread-3, Thread-4]
Locked by Thread-2 , Waiting by [Thread-3, Thread-4]
Locked by Thread-3 , Waiting by [Thread-4]
Locked by Thread-4 , Waiting by []

2. UnFair的输出结果如下:

Locked by Thread-0 , Waiting by [Thread-2, Thread-1, Thread-3, Thread-4]
Locked by Thread-0 , Waiting by [Thread-2, Thread-1, Thread-3, Thread-4]
Locked by Thread-2 , Waiting by [Thread-1, Thread-3, Thread-4]
Locked by Thread-2 , Waiting by [Thread-1, Thread-3, Thread-4]
Locked by Thread-1 , Waiting by [Thread-3, Thread-4]
Locked by Thread-1 , Waiting by [Thread-3, Thread-4]
Locked by Thread-3 , Waiting by [Thread-4]
Locked by Thread-3 , Waiting by [Thread-4]
Locked by Thread-4 , Waiting by []
Locked by Thread-4 , Waiting by []

正对上面的代码输出结果,有下面的结论:

  • 公平锁每次都是从同步队列中的第一个结点获取到锁
  • 非公平锁出现了线程连续获取锁的情况

四、 读写锁

    读写锁维护了一对锁(读/写锁),通过分别维护读锁/写锁来提高性能,1) 读锁, 可以允许多个读线程进行访问 , 但是写线程不允许访问; 2) 写锁, 写线程占有锁时,读线程和其它线程将会被阻塞
    读写锁带来的好处:

  • 不仅可以保证写操作对读操作可见,还可以带来并发性能的提升
  • 可以简化读写交互场景的编程方式

1) 读写锁的接口与示例
下是是ReentrantReadWriteLock的特性 及 ReentrantReadWriteLock展示内部工作状态方法的图示 :
Java并发编程艺术之Java中的锁

图10、ReentrantReadWriteLock的特性

Java并发编程艺术之Java中的锁

图11、ReentrantReadWriteLock展示内部工作状态的方法

为了进一步说明上面图示中说明的信息,这里通过下面的代码示例进行说明:

public class CacheThread {
    static Map<String , Object> map = new HashMap<String,Object>();
    static ReentrantReadWriteLock rrwl = new ReentrantReadWriteLock();
    static WriteLock wLock = rrwl.writeLock();
    static ReadLock rLock = rrwl.readLock();

    //获取一个key对应的值
    public static final Object get(String key) {
        rLock.lock();
        try {
            return map.get(key);
        } finally {
            rLock.unlock();
        }
    }

    //设置key对应的值, 并返回旧的value
    public static final Object put(String key , Object value) {
        wLock.lock();
        try {
            return map.put(key, value);
        } finally {
            wLock.unlock();
        }
    }

    //情况所有的内容
    public static final void clear() {
        wLock.lock();
        try {
            map.clear();
        } finally {
            wLock.unlock();
        }
    }
}

针对上面的代码:

  • 在读操作get(String key)方法中,需要获取读锁,这使得并发访问该方法时不会被阻塞
  • 写操作put(String key,Object value)方法和clear()方法,在更新HashMap时必须提前获取写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,而只有写锁被释放之后,其他读写操作才能继续

CacheThread使用读写锁提升读操作的性能,也保证每次写操作对读操作是可见的,同时简化的编程方式

2) 读写锁的实现分析
这里从四个方面进行说明:

  • 读写状态的设计
  • 写锁的获取与释放
  • 读锁的获取与释放
  • 锁降级

2.1 读写状态的设计
    读写锁需要在同步状态上维护多个读线程和一个写线程的状态,因此该同步状态的设计方式显得十分重要, 这里是通过“按位切割”来满足要求,通过将变量切分成高16位读,低16位写的两部分来满足要求
下面是图示:
Java并发编程艺术之Java中的锁

图11、读写锁状态的划分方式

针对上面图示, 当前同步状态表示一个线程已经获取了写锁, 并且重入了两次,同时也连续获取了两次读锁。这里通过位运算来快速计算读/写状态,具体如下:

  • 写: status & ox0000FFFF
  • 读:status >>> 16
  • 写状态 + 1 : status + 1
  • 读状态 + 1 : status + (1 << 16)

2.2 写锁的获取与释放
    写锁是一个支持重进入的排他锁

  • 如果当前线程已经获取了写锁,则增加写状态
  • 如果当前线程在获取写锁时,读锁已经被获取(读状态 != 0)获取其它写线程获取了锁,则当前线程进入等待状态

下面是获取写锁的代码清单:

protected final boolean tryAcquire(int acquires) {
     Thread current = Thread.currentThread();
      int c = getState(); // 获取状态信息
      int w = exclusiveCount(c); // 获取写状态信息
      if (c != 0) {
          // (Note: if c != 0 and w == 0 then shared count != 0)
          if (w == 0 || current != getExclusiveOwnerThread()) // 写数量 = 0 , 表示读线程占有锁或其它写线程占有锁
              return false;
          if (w + exclusiveCount(acquires) > MAX_COUNT) // 数值溢出, 抛出异常
              throw new Error("Maximum lock count exceeded");
          // Reentrant acquire
          setState(c + acquires); // 设置状态值
          return true;
      }
      if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) // 写线程应该被阻塞 或 设置state失败
          return false;
      setExclusiveOwnerThread(current);
      return true;
  }

针对上面的代码: 该方法除了重入条件,增加了一个读取读锁是否存在的判断exclusiveCount(c), 如果存在读锁,则写锁不能被获取, 原因如下:

  • 如果读锁已经被获取的情况下,若再次对写锁获取,则正在运行的其它线程无法感知当前写线程的操作, 只有等读操作是否了锁,写操作才能获取锁,在写线程获取到锁之后,其它线程的读/写操作将会被阻塞

2.3 读锁的获取与释放
    读锁是一个支持重进入的共享锁, 他能够被多个线程同时获取,

  • 如果当前线程已经获取了读锁,会增加读状态
  • 如果当前线程获取读状态时,写锁已经被其它线程获取,则进入等待状态

下面是获取读锁的代码清单:

protected final int tryAcquireShared(int unused) {
      Thread current = Thread.currentThread();
      int c = getState();
      if (exclusiveCount(c) != 0 &&
          getExclusiveOwnerThread() != current) // 其它线程获取到了写锁
          return -1;
      int r = sharedCount(c);  // 获取读状态的值
      if (!readerShouldBlock() &&
          r < MAX_COUNT &&
          compareAndSetState(c, c + SHARED_UNIT)) { // 如果读不允许阻塞,且读状态没有溢出, 则设置新的读状态
          if (r == 0) { // 读状态为0, 表示首次获取
              firstReader = current;
              firstReaderHoldCount = 1;
          } else if (firstReader == current) { // r != 0 , 且是同一个线程
              firstReaderHoldCount++;
          } else { // HoldCounter 设置相关信息(HoldCounter的作用后面继续分析)
              HoldCounter rh = cachedHoldCounter;
              if (rh == null || rh.tid != getThreadId(current))
                  cachedHoldCounter = rh = readHolds.get();
              else if (rh.count == 0)
                  readHolds.set(rh);
              rh.count++;
          }
          return 1;
      }
      return fullTryAcquireShared(current);
  }

针对上面的代码:

  • 如果其它线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态
  • 如果当前线程获取了读锁,或者写锁没有被获取,则成功获取读锁

2.4 锁降级
首先先观察分析下面的代码清单:

public void processData() {
    readLock.lock();
    if (!update) {
        // 必须先释放读锁
        readLock.unlock();
        // 锁降级从写锁获取到开始
        writeLock.lock();
        try {
            if (!update) {
                // 准备数据的流程(略)
                update = true;
            }
            readLock.lock();
        } finally {
            writeLock.unlock();
        }
        // 锁降级完成,写锁降级为读锁
    }
    try {
        // 使用数据的流程(略)
    } finally {
        readLock.unlock();
    }
}

针对上面的代码:
数据发生变化后, udpate变量被设置为false, 此时访问processDate()方法的线程都会感知到变化,都是只有一个线程可以能够获取到写锁,其它线程会被阻塞在读锁和写锁的lock方法上, 当前线程获取写锁完成数据准备之后,再获取读锁,随后释放写锁,完成锁的降级

五、Condition接口

    常见的实现等待/通知模式存在下面的两种方式:

  • 任意Java对象都存在一组监视器方法(java.lang.Object), 主要包括wait()、wait(long timeout)、notify()、 notifyAll(), 这些方法与synchronized关键字配合,可以实现等待/通知模式
  • Condition接口与Lock配合使用可以实现等待通知模式

Object监视器方法与Condition接口分别实现等待/通知模式的对比如下:
Java并发编程艺术之Java中的锁

图12、Object的监视器方法与Condition接口的对比

4.1 Condition接口示例
    Condition定义了等待/通知两种类型方法,线程在调用这些方法时,需要提前获取Condition对象的锁。Condition对象是由Lock对象创建而来,也就是Condition对象依赖Lock对象,下面的代码示例说明Condition的使用方式:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
    lock.lock();
    try {
        condition.await();
    } finally {
        lock.unlock();
    }
} 
public void conditionSignal() throws InterruptedException {
    lock.lock();
    try {
        condition.signal();
    } finally {
        lock.unlock();
    }
}

针对上面的代码: 一般会将Condition对象视作为成员变量,1)当调用await()方法后,当前线程会释放锁并在此等待,2) 当其它线程调用Condition对象的signal()方法,通知当前线程后,线程会从await()返回,并且在返回前已经获取到了锁信息。Java并发编程艺术之Java中的锁

图13、Condition的(部分)方法以及描述

通过如下有界队列代码示例来进一步了解Condition的使用方式:

public class BoundedQueueThread<T> {
    private Object[] items;
    // 添加的下标,删除的下标和数组当前数量
    private int addIndex, removeIndex, count;
    private Lock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public BoundedQueue(int size) {
        items = new Object[size];
    }

    // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
    public void add(T t) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[addIndex] = t;
            if (++addIndex == items.length)
                addIndex = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
    @SuppressWarnings("unchecked")
    public T remove() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[removeIndex];
            if (++removeIndex == items.length)
                removeIndex = 0;
            --count;
            notFull.signal();
            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}

针对上面的代码,在添加和删除方法中使用while而不是if,目的是防止过早或意外的通知,只有条件满足的情况下才退出for循环。

4.2 Conditon的实现分析
    每个ConditionObject都包含着一个队列(等待队列),而这个队列是实现等待/通知的关键,下面将从下面几个部分来说明:

  • 等待队列
  • 等待和通知

等待队列
    等待队列是一个FIFO的队列,在队列中每个线程都包含一个线程引用,当调用Condition.await()方法,该线程将会释放锁,构造结点加入等待队列,并进入等待状态
    ConditionObject拥有首节点(firstWaiter)和尾结点(lastWaiter), 当前线程调用Condition.await(), 将会以当前线程构造结点,并将线程加入队列尾部,该队列的基本结构如下图:
Java并发编程艺术之Java中的锁

图14、等待队列的基本结构

    从结构图可以看出,ConditionObject拥有首尾结点引用,只需要将原有的尾结点nextWaiter指向新增结点,并且更新lastWaiter结点信息为新增结点。
    在Object的监视器模型上,一个对象拥有一个同步队列和一个等待队列,而同步器拥有一个同步队列和多个等待队列,其图示如下:
Java并发编程艺术之Java中的锁

图15、同步队列与等待队列

ConditionObject实现的是同步器的内部类,因此每个ConditionObject实例都能够访问同步器提供的方法,相当于每个ConditionObject都拥有所属同步器的引用。

等待/通知

等待
    等待:调用await()或者以await()开头的方法。在调用该方法之后,线程会释放锁进入等待状态,当接收到signal信号后,会从个等待状态返回,此时线程已经获取到了ConditionObject相关联的锁。
    执行await()方法,从队列的角度来看是将同步队列的首节点(获取了锁的结点)移动到等待队列
下面是await相关的代码清单

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            long savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

针对上面的代码清单,调用该方法之后,同步队列中的首节点会将当前线程构造成结点并加入等待队列中,然后唤醒同步队列中的后继结点,最后当前结点进入等待状态。
处于等待状态(调用了await()方法)的线程被唤醒的方式:

  • 调用了Condition.signal()方法主动进行唤醒
  • 等待线程被中断退出,会抛出InterruptedException

下面是线程加入等待队列图示:
Java并发编程艺术之Java中的锁

图16、当前线程加入等待队列

唤醒
    唤醒:调用signal或者以signal开头的方法,调用该方法后会唤醒等待队列中等待时间最长的结点,在唤醒结点前会将其移动到同步队列
下面是代码清单:

 public final void signal() {
   if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
 }

private void doSignal(Node first) {
    do {
         if ( (firstWaiter = first.nextWaiter) == null)
             lastWaiter = null;
         first.nextWaiter = null;
     } while (!transferForSignal(first) &&
              (first = firstWaiter) != null);
 }
final boolean transferForSignal(Node node) {
   if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
         return false;
     Node p = enq(node);
     int ws = p.waitStatus;
     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
         LockSupport.unpark(node.thread);
     return true;
 }

调用该方法的前置条件是当前线程必须获取了锁,这里是通过isHeldExclusively()方法进行检查,接着获取等待队列首节点,将其移动到同步队列并使用LockSupport唤醒结点中的线程。
下面是唤醒线程的图示:
Java并发编程艺术之Java中的锁

图17、节点从等待队列移动到同步队列

    通过调用同步器的enq方法,等待队列中的头节点将线程安全的移动到同步队列尾部,当结点移动到同步队列后,当前结点使用LockSupport唤醒该结点线程。
    被唤醒后的线程,将从await()方法中的while循环退出(参见上面 await方法的代码清单中isOnSyncQueue(Node node)方法),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中
    Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点线程

六、LockSupport工具(暂未整理,后期补充)

参考资料:
1. 并发编程艺术
2. https://ifeve.com/

相关标签: java concurrent