欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JUC之AQS

程序员文章站 2022-05-04 21:39:00
...

AbstractQueuedSynchronizer

先大致讲一下工作原理:AQS内部主要维护了一个Node类型的链表,用于储存排队的线程信息,当有新的需要阻塞的线程进来时一般创建一个node对象,加入到链表的尾部,当链表中首节点释放资源时调用LockSupport的unpark去唤醒等待的线程。

配上一个比较经典的图片

JUC之AQS

接下来是AQS主要的成员变量:

	//链表的首节点
	private transient volatile Node head;

	//链表的尾节点
	private transient volatile Node tail;

	//当前同步器的状态
	private volatile int state;

	//一般用于申请资源时判断是否自旋的一个时间阈值,当请求设置的超时时间小于这个时间的时候,就直接自旋而不是等待
	static final long spinForTimeoutThreshold = 1000L;

	//Unsafe对象,java中用于直接操作内存地址的工具,授信于jdk代码
	private static final Unsafe unsafe = Unsafe.getUnsafe();

	//下面可以理解为上述成员变量在对应对象的内存地址内的偏移量
	//waitStatus与next是AQS的内部类Node中的成员变量
	private static final long stateOffset;
	private static final long headOffset;
	private static final long tailOffset;
	private static final long waitStatusOffset;
	private static final long nextOffset;

接下来介绍一些核心方法

  • acquire(int arg)
    无视线程interrupt不断的申请资源
	/**
	* acquire用于排他的申请资源,其中tryAcquire是一个抽象方法,需要子类去自己实现。
	* 当第一次tryAcquire失败后,会添加一个排他的node到node链表中,然后不断的申请资源。
	* /
	public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    /**
    * 添加当前线程到等待的链表中,其中通过unsafe中的方法修改自己为tail节点
    * 如果失败了则进入enq中循环执行把自己设置为tail的操作
    */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    /**
    * 该方法循环执行以下逻辑
    * 当前线程前一个线程是head时,去执行tryAcquire方法尝试获取资源,同上tryAcquire为子类实现
    * 执行shouldParkAfterFailedAcquire判断在请求资源失败之后是否需要park(即阻塞自己),如果需要阻塞则执行parkAndCheckInterrupt
    */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    /**
    * 取链表中前驱节点的waitStatus
    * 如果是signal的,则需要阻塞自己,因为之前的节点在释放资源之后会唤醒自己
    * 如果状态>0的,即是cancelled,则往前找,一直找到不是cancelled状态的节点,然后把自己放在它后面
    * 如果状态是0或者-3,则需要把前驱的状态设置为signal的,此次自己不阻塞等下次自己再进入这个方法时,便会阻塞自己
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    /**
    * 执行LockSupport中的park方法,阻塞自己,等待其他线程释放资源时唤醒自己,执行下一行即return
    */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
  • acquireInterruptibly(int arg)
	/** 
	* 与accquire方法不同的是,当线程interrupt的时候,该方法会抛出InterruptedException
	*/
	public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    /**
    * 与之前acquireQueued不同的是,这里当执行到parkAndCheckInterrupt返回true的时候,即线程状态是interrupted状态时,会抛出异常
    * 这里补充一下,线程一般销毁是在run方法执行完后(stop destry这些不推荐使用),而执行线程的interrupt方法时,
    * 线程不会结束,只会去修改线程的状态,以下是网上处理interrupt的一些建议
    * 1)线程处于阻塞状态,如使用了sleep,同步锁的wait,socket中的receiver,accept等方法时,会使线程处于阻塞状态。
    * 当调用线程的interrupt()方法时,会抛出InterruptException异常。阻塞中的那个方法抛出这个异常,通过代码捕获该异常,
    * 然后break跳出循环状态,从而让我们有机会结束这个线程的执行。
    *  一定要先捕获InterruptedException异常之后通过break来跳出循环,才能正常结束run方法
    * 2)线程未处于阻塞状态,使用isInterrupted()判断线程的中断标志来退出循环。
    * 当使用interrupt()方法时,中断标志就会置true,和使用自定义的标志来控制循环是一样的道理
*/
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • acquireShared(int arg)
    /**
    * 同样的是tryAcquireShared是个抽象方法,需要子类去实现。
    * 返回负值就表示失败,然后进入自旋
    */
	public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    /**
    * 这里先添加一个共享模式的node到链表尾部,然后自旋,大部分逻辑与acquireQueued是一样的
    * 判断是否需要阻塞
    */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * 当前线程取得资源 或者 首节点为空 或者 首节点不是被cancel 或者 此时head为空 或者 此时head没有被cancel
         * 这些条件满足一条时 判断下个节点是否是空或者共享节点,是的话就调用doReleaseShared
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
  • acquireSharedInterruptibly(int arg)
    与排它模式的interrupt一样,就是线程被设置为interrupt状态后会抛出异常
	public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
  • release(int arg)
    释放资源
	/**
	*  tryRelease为抽象方法,需要子类实现
	*  这里tryRelease成功后,如果head不为空且waitStatus不为0就唤醒后继的阻塞线程
	*/
	public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

然后是两个内部类,先介绍Node这个内部类

成员变量

		/** 标志node为共享模式 */
        static final Node SHARED = new Node();
        
        /** 标志node为排他模式 */
        static final Node EXCLUSIVE = null;

        /** waitStatus的值,标志着线程被撤销了 */
        static final int CANCELLED =  1;
        
        /** waitStatus的值,标志着后续的线程需要被unpark,说简单点,就是处于这个状态的node,在head节点释放之后会被唤醒 */
        static final int SIGNAL    = -1;
        
        /** waitStatus的值,标志着线程正在等待某种条件满足 */
        static final int CONDITION = -2;
        
        /**
         * waitStatus的值,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
         */
        static final int PROPAGATE = -3;
        
		/** waitStatus**/
		volatile int waitStatus;
		
		/** 前驱节点 **/
		volatile Node prev;
		
		/** 后继节点 **/
		volatile Node next;
		
		/**当前线程**/
		volatile Thread thread;
		
		/**下一个等待环节,指向排他类型的node,区分next在于next可能是共享模式的,方便快速定位到下一个等待线程**/
		Node nextWaiter;

另一个内部类ConditionObject,实现了Condition接口

成员变量

		/** First node of condition queue. */
        private transient Node firstWaiter;
        
        /** Last node of condition queue. */
        private transient Node lastWaiter;
        
		/** Mode meaning to reinterrupt on exit from wait */
        private static final int REINTERRUPT =  1;
        
        /** Mode meaning to throw InterruptedException on exit from wait */
        private static final int THROW_IE    = -1;
相关标签: JUC AQS