欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java中的锁(五)

程序员文章站 2022-06-04 11:33:00
...

本章主要介绍Java中semaphore的使用和底层实现原理。

一、semaphore的基本概念

semaphore又叫信号量,在Java1.5中引入,是用来控制同时访问共享资源的线程数量,通过协调各个线程,以保证合理的使用资源。semaphore底层是通过AQS实现线程管理的,提供两个构造函数,实现公平和非公平共享信号量。

//非公平信号量,同时可以允许多少线程(许可)进行访问,和线程等待时间长久没有关系
public Semaphore(int permits) {          
    sync = new NonfairSync(permits);
}

//公平共享信号量,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {    
    sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}

使用示例: 

public class SemaphoreTest {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        //设置信号量同时执行的线程数是5
        final Semaphore semp = new Semaphore(5);

        for(int i =0; i<10;i++){
            final int finalI = i;
            Runnable task = () -> {
                try {
                    semp.acquire();
                    System.out.println("Accessing: " + finalI);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {

                } finally {
                    semp.release();
                }
            };
            exec.submit(task);
        }
        exec.shutdown();
    }
}

Semphore类中提供的基本方法:

public class Semaphore implements java.io.Serializable{
    // 底层的Sync内部类,提供tryReleaseShared
    abstract static class Sync extends AbstractQueuedSynchronizer...
    
    static final class NonfairSync extends Sync...
    static final class FairSync extends Sync...

    // 获取一个信号量,可以被中断
    public void acquire() throws InterruptedException;
    // 获取一个中断信号,不能被中断
    public void acquireUninterruptibly();
    public boolean tryAcquire();
    public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException

    // 获取指定数量的信号量
    public void acquire(int permits) throws InterruptedException
    public void acquireUninterruptibly(int permits)
    public boolean tryAcquire(int permits)

    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException 
    // 释放一个信号量
    public void release()
    public void release(int permits)
    // 动态减小许可,当资源用完不能再用时,这时就可以减小许可证
    protected void reducePermits(int reduction)

}

二、Semaphore的底层实现原理

2.1 非公平锁中的共享锁

static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
          super(permits);
    }
   //调用父类Sync的nonfairTryAcquireShared
   protected int tryAcquireShared(int acquires) {
       return nonfairTryAcquireShared(acquires);
   }
}

其中,permits传入值后,设置为AQS的state。

Sync(int permits) {
    setState(permits);
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    // AQS 的state
    private volatile int state;

    protected final int getState() {return state;}

    protected final void setState(int newState) {state = newState;}

    //对state变量进行CAS 操作
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
}

调用acquire()方法时,先判断state是否足够使用,如果state值代表的许可数足够使用,请求线程将会获得同步状态即对共享资源的访问权,并CAS更新state的值(一般是对state值减1);如果state值已为0,请求线程将无法获取同步状态,将被加入到同步队列并阻塞,直到其他线程释放同步状态才可能获取对共享资源的访问权。

// 中断性获取锁
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// AQS中中断性获取锁
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
         throw new InterruptedException();

    //未获取成功加入同步队列等待
    if (tryAcquireShared(arg) < 0)
         doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
           final Node p = node.predecessor();
           if (p == head) {
               int r = tryAcquireShared(arg);
               if (r >= 0) {
                   setHeadAndPropagate(node, r);
                   p.next = null; // help GC
                   failed = false;
                   return;
               }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

非常重要的一点,对于tryAcquire()方法,内部的实现直接在Semaphore中,不会加入等待队列

final int nonfairTryAcquireShared(int acquires) {
   for (;;) {
      int available = getState();
      int remaining = available - acquires;
      if (remaining < 0 || compareAndSetState(available, remaining))
           return remaining;
   }
}

对于acquireUninterruptibly(int permits)方法,内部依托于AQS的同步获取方法:

public void acquireUninterruptibly(int permits) {
   if (permits < 0) throw new IllegalArgumentException();
      sync.acquireShared(permits);
}
// 依赖AQS的同步获取
public final void acquireShared(int arg) {
   if (tryAcquireShared(arg) < 0)
       doAcquireShared(arg);
}

// 先查看一下Semaphore的非公平获取方法
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

private void doAcquireShared(int arg) {
   // 首先加入等待队列
   final Node node = addWaiter(Node.SHARED);
   boolean failed = true;
   try {
       boolean interrupted = false;
       // 进入自旋操作
       for (;;) {
           final Node p = node.predecessor();
           // 如果前置节点时head
           if (p == head) {
               //如果前置节点是head,本节点尝试获取锁,并设置本节点为head,返回获取到锁
               int r = tryAcquireShared(arg);
               if (r >= 0) {
                   setHeadAndPropagate(node, r);
                   p.next = null; // help GC
                   if (interrupted)
                        selfInterrupt();
                   failed = false;
                   return;
               }
            }
            //调整同步队列中node结点的状态并判断是否应该被挂起
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
            }
       } finally {
            if (failed)
                cancelAcquire(node);
       }
    }

在获取同步状态时,加入同步队列后,如果前置节点不是head,会执行shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 如果前置节点等待被唤醒,需要执行后面的阻塞方法
        if (ws == Node.SIGNAL)
            return true;
        // 如果前置节点已经执行完毕,遍历同步队列,知道找到一个需要被唤醒的节点作为前置节点
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 如果前置节点状态小于0并且也不是SIGNAL,设置为SIGNAL状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

释放锁的方法release(),具体的实现有:

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
// AQS中释放的方法
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// Semaphore中释放方法
protected final boolean tryReleaseShared(int releases) {
    // 自旋的方式设置state
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
        }
}

// 释放成功后,要唤醒后继节点
private void doReleaseShared() {
    // 如果头节点的后继节点需要唤醒,那么执行唤醒动作;否则将头结点的等待状态设置为PROPAGATE保证   
    // 唤醒传递。
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果头节点对应的线程是SIGNAL状态,头节点获取到同步状态,
            if (ws == Node.SIGNAL) {
                // 修改头结点对应的线程状态设置为0。失败的话,则继续循环。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;

                // 唤醒头结点后继结点所对应的线程
                unparkSuccessor(h);
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;             
        }
        // 如果头结点发生变化,则继续循环。否则,退出循环。
        if (h == head)               
            break;
    }
}


//唤醒传入结点的后继结点对应的线程
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
      if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);
       //拿到后继结点
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node t = tail; t != null && t != node; t = t.prev)
              if (t.waitStatus <= 0)
                  s = t;
      }
      if (s != null)
          LockSupport.unpark(s.thread);
    }

2.2 公平锁中的共享锁

    static final class FairSync extends Sync {
        FairSync(int permits) {super(permits);}

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                // 这里于非公平锁的主要区别是:队列中是否有节点,如果有节点,直接加入同步等待队列
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

参考链接:https://blog.csdn.net/javazejian/article/details/76167357

https://blog.csdn.net/yucaixiang/article/details/89360898