欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

盘一盘 AQS和ReentrantLock

程序员文章站 2022-04-18 18:29:33
AQS是个啥? AQS(AbstractQueuedSynchronizer)是Java并发用来构建锁和其他同步组件的基础框架。许多同步类实现都依赖于它,如常用的ReentrantLock/ReentrantReadWriterLock/CountDownLatch等 AQS提供了独占(Exclus ......

aqs是个啥?

aqs(abstractqueuedsynchronizer)是java并发用来构建锁和其他同步组件的基础框架。许多同步类实现都依赖于它,如常用的reentrantlock/reentrantreadwriterlock/countdownlatch等
 
aqs提供了独占(exclusive)以及共享(share)两种资源共享方式:
acquire(acquireshare)/release(releaseshare)。 
acquire:获取资源,如果当前资源满足条件,则直接返回,否则挂起当前线程,将该线程加入到队列排队。
release:释放资源,唤醒挂起线程
 
 

aqs队列

aqs队列示意图

盘一盘 AQS和ReentrantLock

 

aqs队列中的主要属性

//  等待队列头部
private transient volatile node head;

// 等待队列尾部
private transient volatile node tail;

// 锁的状态(加锁成功则为1,解锁为0,重入再+1)
private volatile int state;

//  当前持有锁的线程,注意这个属性是从abstractownablesynchronizer继承而来
private transient thread exclusiveownerthread;

 

node类中的主要属性

static final class node {
    // 标记表示节点正在共享模式中等待
    static final node shared = new node();
    // 标记表示节点正在独占模式下等待
    static final node exclusive = null;

    // 节点的等待状态 还有一个初始化状态0 不属于以下四种状态
    // 表示node所代表的当前线程已经取消了排队,即放弃获取锁
    static final int cancelled =  1;
    // 当一个节点的waitstatus被置为signal,就说明它的下一个节点(即它的后继节点)已经被挂起了(或者马上就要被挂起了),
    // 只要前继结点释放锁,就会通知标识为signal状态的后继结点的线程执行
    static final int signal    = -1;
    // 节点在等待队列中
    // 当其他线程对condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
    static final int condition = -2;
    // 表示下一次共享式同步状态获取,将会无条件地传播下去
    static final int propagate = -3;

    // 节点等待状态,该字段初始化为0,
    volatile int waitstatus;

        // 当前节点的前置节点
    volatile node prev;

    // 当前节点的后置节点
    volatile node next;

        // 在此节点上排队的线程信息
    volatile thread thread;
}

 

reentrantlock实现

在引入reentrantlock实现前,我先来科普一下 util.concurrent包的作者doug lea,相比较其他而言,并发包的源码阅读难度较大。脸上永远挂着谦逊腼腆笑容的doug lea先生使用了大量相对复杂的逻辑判断,比如一个判断条件中执行多个或且方法,让你很难跟上他的节奏,很难揣摩他的设计思想。小声逼逼,还不是我太菜了,留下来没有技术的泪水。
盘一盘 AQS和ReentrantLock

 

继承关系图

reentrantlock是lock接口的一个实现类,是一种可重入的独占锁。
reentrantlock内部通过内部类实现了aqs框架(abstractqueuedsynchronizer)的api来实现独占锁的功能。
盘一盘 AQS和ReentrantLock

 

主要属性

private final sync sync;

// 公平锁内部是fairsync,非公平锁内部是nonfairsync。
// 两者都通过继承 sync间接继承自abstractqueuedsynchronizer这个抽象类
abstract static class sync extends abstractqueuedsynchronizer {
    private static final long serialversionuid = -5179523762034025860l;
    
    // 加锁
    abstract void lock();

    // 尝试获取锁
    final boolean nonfairtryacquire(int acquires) {
        final thread current = thread.currentthread();
        int c = getstate();
        if (c == 0) {
            if (compareandsetstate(0, acquires)) {
                setexclusiveownerthread(current);
                return true;
            }
        }
        else if (current == getexclusiveownerthread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new error("maximum lock count exceeded");
            setstate(nextc);
            return true;
        }
        return false;
    }

    // 尝试释放锁
    protected final boolean tryrelease(int releases) {
        int c = getstate() - releases;
        if (thread.currentthread() != getexclusiveownerthread())
            throw new illegalmonitorstateexception();
        boolean free = false;
        if (c == 0) {
            free = true;
            setexclusiveownerthread(null);
        }
        setstate(c);
        return free;
    }
}

 

构造方法

//默认创建一个非公平锁
public reentrantlock() {
    sync = new nonfairsync();
}

//传入true创建公平锁,false非公平锁
public reentrantlock(boolean fair) {
    sync = fair ? new fairsync() : new nonfairsync();
}

 

reentrantlock公平锁

我们以公平锁为例对其中重要方法源码分析

// 继承了 sync,从而间接继承了 abstractqueuedsynchronizer这个抽象类
static final class fairsync extends sync {
    private static final long serialversionuid = -3000897897090466540l;

       // 上锁
    final void lock() {
        //调用 aqs 中 acquire方法
        acquire(1);
    }

    protected final boolean tryacquire(int acquires) {
        final thread current = thread.currentthread();
        int c = getstate();
        if (c == 0) {
            if (!hasqueuedpredecessors() &&
                compareandsetstate(0, acquires)) {
                // cas操作设置 state
                // 设置当前线程为拥有锁的线程
                setexclusiveownerthread(current);
                return true;
            }
        }

        else if (current == getexclusiveownerthread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new error("maximum lock count exceeded");
            setstate(nextc);
            return true;
        }
        return false;
    }
}

 

acquire方法源码分析

public final void acquire(int arg) {
    // tryacquire(arg)尝试加锁,如果加锁失败则会调用acquirequeued方法加入队列去排队,如果加锁成功则不会调用
    if (!tryacquire(arg) &&
        acquirequeued(addwaiter(node.exclusive), arg))
        selfinterrupt();
}
acquire方法干了这么几件事情
1、tryacquire() 尝试获取资源,如果成功则直接返回;
2、addwaiter() 将该线程加入等待队列, 更新aqs队列链信息
3、acquirequeued() 使线程在等待队列中获取资源,直到获取资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
4、selfinterrupt() 自我中断,如果线程在等待过程中被中断过,它是不响应的。只是获取资源后再将中断补上。
 

tryacquire方法

protected final boolean tryacquire(int acquires) {
    // 获取当前线程
    final thread current = thread.currentthread();
    // 获取lock对象的上锁状态,如果锁是*状态则=0,如果被上锁则为1,大于1表示重入
    int c = getstate();

    // c=0 代表没人占用锁,当前线程可以直接获取锁资源执行
    if (c == 0) {
        // 下面介绍hasqueuedpredecessors()方法,判断自己是否需要排队
        if (!hasqueuedpredecessors() &&
            compareandsetstate(0, acquires)) {
            // cas操作设置 state
            // 设置当前线程为拥有锁的线程
            setexclusiveownerthread(current);
            return true;
        }
    }

    // 非重入锁直接返回false,加锁失败
    else if (current == getexclusiveownerthread()) {
        // 若为重入锁, state 加1 (acquires)
        int nextc = c + acquires;
        if (nextc < 0)
            throw new error("maximum lock count exceeded");
        setstate(nextc);
        return true;
    }
    return false;
}

 

hasqueuedpredecessors方法

public final boolean hasqueuedpredecessors() {
    // 获取队列头、尾节点信息
    node t = tail; 
    node h = head;
    node s;
    // h != t 有几种情况
    // 1、队列尚未初始化完成,第一个线程获取锁资源,
    //    此时h和t都是null, h != t返回fasle初始化队列
    // 2、队列已经被初始化了,其他的线程尝试获取资源,
    //    此时头尾节点不相同,h!=t返回true,
    //    继续判断s.thread != thread.currentthread() 当前来参与竞争锁的线程和第一个排队的线程是同一个线程,则需要排队。
    // 3、队列已经被初始化了,但是由于锁释放的原因导致队列里面只有一个数据
    return h != t &&
        ((s = h.next) == null || s.thread != thread.currentthread());
}

 

addwaiter方法

private node addwaiter(node mode) {
    // aqs队列中的元素类型为node,需要把当前线程封装成为一个node对象
    node node = new node(thread.currentthread(), mode);

    // tail为队尾,赋值给pred
    node pred = tai
    // 判断pred是否为空,其实就是判断队尾是否有节点,其实只要队列被初始化了队尾肯定不为空,
    if (pred != null) {
        // 拼装node队列链的过程
        // 直接把当前线程封装的node的上一个节点设置成为pred即原来的队尾
        node.prev = pred;
        if (compareandsettail(pred, node)) {
            // pred的下一个节点设置为当node
            pred.next = node;
            return node;
        }
    }

    // 拼接aqs队列链
    enq(node);
    return node;
}


private node enq(final node node) {
    for (;;) {
        node t = tail;
        if (t == null) { // must initialize
            if (compareandsethead(new node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareandsettail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

 

acquirequeued方法

final boolean acquirequeued(final node node, int arg) {
    // 标记是否成功拿到资源
    boolean failed = true;
    try {
        // 标记等待过程中是否被中断过
        boolean interrupted = false;
        // 自旋
        for (;;) {
            final node p = node.predecessor();
            // 判断自己是否为队列中的第二个节点
            // 成为队列中第二个节点才有资格获取资源
            if (p == head && tryacquire(arg)) {
                sethead(node);
                p.next = null; // help gc
                failed = false;
                // 返回等待过程中是否被中断过
                return interrupted;
            }
            if (shouldparkafterfailedacquire(p, node) &&
                parkandcheckinterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelacquire(node);
    }
}

 

公平锁和非公平锁的主要区别

为了方便对比,在这里列举了两种锁的上锁过程源码,注意红色标识片段

// 公平锁上锁过程
final void lock() {
    //调用 aqs 中 acquire方法
    acquire(1);
}  

 

// 非公平锁上锁过程
final void lock() {
    // 尝试获取锁,加锁不成功则排队。排队之前仅有的一次插队机会。
    if (compareandsetstate(0, 1))
        setexclusiveownerthread(thread.currentthread());
    else
        acquire(1);
}

 

 

总结

1、如果第一个线程尝试获取资源时,此时和aqs队列无关,线程直接持有锁。并且不会初始化队列,如果接下来的线程都是交替执行,那么和aqs队列永远无关,均为线程直接持有锁。
2、在线程发生资源竞争的情况下,才会初始化aqs队列,aqs队列的头部永远是一个虚拟的thread为null的node。
3、未能获取到资源的线程将会处于park状态,此时只有队列中第二个node等待被唤醒,尝试去获取资源。其他node并不去竞争资源,这也是aqs队列的精髓所在,减少了cpu的占用。
4、公平锁的上锁是必须判断自己是不是需要排队;而非公平锁是直接进行cas修改计数器看能不能加锁成功;如果加锁不成功则乖乖排队(调用acquire);所以不管公平还是不公平;只要进到了aqs队列当中那么他就会排队;一朝排队;永远排队!