欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

再看Lock

程序员文章站 2022-05-05 09:53:01
...

之前有对Lock接口做过简单介绍,Lock接口的实现基本都是通过聚合了一个同步器的子类来完成线程访问控制的,学习了AQS之后,再次来看Lock,内容主要摘自《Java并发编程的艺术》

重入锁

重入锁ReentrantLock,即支持重进入的锁,表示该锁能支持一个线程对资源的重复加锁,该锁还支持获取锁时的公平和非公平的选择

1.实现重进入

重进入指任意线程在获取到锁之后能够再次获取该锁而不会被锁阻塞,该特性的实现需要解决以下两个问题:
(1)线程再次获取锁,锁需要识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取
(2)锁的最终释放,要求锁对于获取进行计数自增,计数表示当前锁被重新获取的次数,锁被释放时计数自减,当计数等于0时表示锁已经成功释放

ReentrantLock通过组合自定义同步器来实现锁的获取与释放,以非公平锁实现为例,获取同步状态代码:

/*
通过判断当前线程是否为获取锁的线程来决定获取操作是否成功
如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功
成功获取锁的线程再次获取锁,只是增加了同步状态值
*/
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
 //非公平锁,只要CAS设置同步状态成功则表示当前线程获取了锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
/*
释放同步状态时减少同步状态值
如果该锁被获取了n次,那么前(n-1)次tryRelease方法必须返回false,只有同步状态完全释放了才返回true
*/
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
 //将同步状态是否为0作为最终释放的条件
 //当同步状态为0,将占有线程设置为null,并返回true,表示释放成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

2.公平与非公平

公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求时间的绝对顺序,即FIFO

上一节的非公平锁,只要CAS设置同步状态成功则表示当前线程获取了锁,而公平锁不同:

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
/*
加入了同步队列中当前节点是否有前驱节点的判断
如果该方法返回true,表示有线程比当前线程更早请求锁
so需要等待前驱线程获取并释放锁之后才能继续获取锁
*/
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
 //到这里就是获取到锁了,标记一下告诉大家,现在是我占用了锁
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

公平锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换,非公平锁虽然可能造成线程“饥饿”,但极少的线程切换保证了其更大的吞吐量

读写锁

接口ReadWriteLock,其实现类ReentrantReadWriteLock

读写锁在同一时刻允许多个线程访问,但在写线程访问时,所有的读线程和其他写线程均被阻塞,读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升

1.读写状态的设计

读写锁的自定义同步器需要在同步状态上维护多个读线程和一个写线程的状态,如果在一个整型变量上维护多种状态,即“按位切割使用”这个变量,读写锁将变量分为两个部分,高16位表示读,低16位表示写

那么读写锁是如何快速确定读和写各自状态?通过位运算,假设当前同步状态值为s,则写状态为s & 0x0000FFFF,读状态为s >>>16,当写状态增加1时,等于 s+1,当读状态增加1时,等于s+(1<<16)

推论:s不等于0时,当写状态等于0时,读状态大于0,即读锁已被获取

2.写锁的获取与释放

/*
如果当前线程已经获取了写锁,则增加写状态
如果当前线程在获取写锁时,读锁已经被获取,或者该线程不是已经获取写锁的线程,则当前线程进入等待
*/
     protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
// 存在读锁或者当前线程不是已经获取写锁的线程
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

锁的释放与ReentrantLock基本类似,每次释放均减少写状态,当写状态为0时表示写锁已经释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见

3.读锁的获取与释放

/*
如果当前线程已经获取了读锁,则增加读状态
如果当前线程在获取读锁时,写锁已经被其他线程获取则进入等待
*/
      protected final int tryAcquireShared(int unused) {

            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁),均减少读状态,减少的值为(1<<16)

Condition接口

任意一个java对象,都拥有一组监视器方法,主要包括wait()、wait(long timeout)、notify()和notifyAll(),这些方法与synchronized配合可实现等待/通知模式,Condition与Lock配合也可实现

Condition接口提供的基本方法:

public interface Condition {
//  使当前线程等待,同时释放当前锁,当其他线程中使用signal或signalAll方法时,线程会重新获得锁并继续执行
//  或者当线程被中断时,也能跳出等待
     void await() throws InterruptedException;
//   和await方法基本相同,但并不会在等待过程中响应中断
     void awaitUninterruptibly();
//当前线程进入等待状态直到被通知、中断或者超时,返回值表示剩余时间
//如果在nanosTimeout纳秒之前被唤醒,返回值就是(nanosTimeout-实际耗时),如果返回值为0或者负数,则表示超时
     long awaitNanos(long nanosTimeout) throws InterruptedException;
     boolean await(long time, TimeUnit unit) throws InterruptedException;
//当前线程进入等待状态直到被通知、中断或者到某个时间
//如果没有到指定时间就被通知方法返回true,否则表示到了指定时间返回false
     boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁
     void signal();//唤醒一个在等待中的线程
     void signalAll();//唤醒所有在等待中的线程
}

1.等待队列

等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await方法,那么该线程就会释放锁、构成节点加入等待队列并进入等待状态,事实上,节点的定义复用了同步器中节点的定义
再看Lock
如图,Condition有首尾节点的引用,新增节点只需要将原有的尾节点nextWaiter指向它,并更新尾节点即可,该过程并没有使用CAS保证,因为调用await方法的线程必定是获取了锁的线程
再看Lock
如图,Condition的实现是同步器的内部类,so每个Condition的实例都能访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用

ConditionObject是同步器AbstractQueuedSynchronizer的内部类:

2.等待

调用Condition的await()方法会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态,当从await()方法返回时,当前线程一定获取了Condition相关联的锁

 //ConditionObject的await方法      
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //当前线程加入等待队列
            Node node = addConditionWaiter();
            //释放同步状态,即释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态,如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则抛出异常InterruptedException
从队列的角度去看,当前线程加入Condition的等待队列如图,即同步队列中的首节点不会直接加入等待队列,而是通过addConditionWaiter()把当前线程构造成一个新的节点并将其加入等待队列
再看Lock

3.通知

调用Condition的signal()方法,会唤醒在等待队列中等待时间最长的结点(首节点),在唤醒节点之前,会将结点移到同步队列中

//调用该方法的前置条件是当前线程必须获取了锁
        public final void signal() {
        //进行isHeldExclusively()检查,即当前线程是否是获取了锁的线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
         //获取等待队列的首节点
         //将其移动到同步队列并使用LockSupport唤醒节点中的线程
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

其中,节点从等待队列移到同步队列如下:
再看Lock
通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列,当节点移动到同步队列后,当前线程再使用LockSupport唤醒该结点的线程
被唤醒后的线程,将从await方法中的while循环中退出,进而调用同步器的acquireQueued(final Node node, int arg)方法加入到获取同步状态的竞争中
成功获取同步状态后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功获取了锁
Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一个signal方法,效果就是将等待队列中的所有节点全部移动到同步队列中,并唤醒每个节点的线程

相关标签: Lock Condition