欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

AbstractQueuedSynchronizer部分源码解析

程序员文章站 2022-03-26 08:49:19
...
    /**
     * 加锁
     */
    public final void acquire(int arg) {
        // 首先尝试获取,如果获取成功直接退出;如果获取失败,即竞争锁失败,则创建Node,并添加到同步队列尾部。
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    private Node addWaiter(Node mode) {
        // 创建节点,mode此时为null,表示下一个等待节点为空
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        // 通过自旋操作,保证将当前节点设置到同步队列尾部
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                /* 前驱为头结点且尝试获取同步状态成功,则将当前节点设置为头节点,原头结点解勾;
                 并返回false,用于finally决定是否取消获取同步状态;
                 */
                if (p == head && tryAcquire(arg)) {
                    // 如果当前结点获取同步状态成功了,那么将当前节点设置为头结点。
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果获取同步状态失败,检查是否需要阻塞线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        /* 
            0:初始化
            CANCELLED =  1 :线程等待超时或者被中断了,需要从同步队列中移走
            SIGNAL    = -1 :后续的节点等待状态,当前节点,通知后面的节点去运行
            CONDITION = -2 :当前节点处于等待队列
            PROPAGATE = -3 :共享,表示状态要往后面的节点传播
        */    
        // 获取前驱的状态码
        int ws = pred.waitStatus;
        // 如果前驱状态码为 SIGNAL,则返回true,表示当前线程需要阻塞
        if (ws == Node.SIGNAL)
            return true;
        // 如果前驱状态码>0,即前驱已经取消,则将同步队列中从该节点起,所有状态为取消的前驱一律解勾
        if (ws > 0) {         
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 代码运行到这里,状态一般只可能为0、-2或者-3,此时将前驱状态设置为 SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    private final boolean parkAndCheckInterrupt() {
        // 阻塞当前线程,并返回是否被中断的状态,底层使用UNSAFE.park(false, 0L);public native void park(boolean var1, long var2);
        LockSupport.park(this);
        return Thread.interrupted();
    }

    /**
     * 释放锁
     */
    public final boolean release(int arg) {
        // 这里是模板方法,由子类实现;如果状态修改成功,则后续逻辑将释放锁
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        // s.waitStatus > 0,表明该结点已被取消
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾部开始找到离头结点最近的一个非空结点,然后对该结点进行唤醒
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒线程,底层由UNSAFE.unpark(thread)实现。public native void unpark(Object var1);
            LockSupport.unpark(s.thread);
    }

 

相关标签: AQS