欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

死磕 java同步系列之Semaphore源码解析

程序员文章站 2022-07-11 17:14:28
Semaphore是什么? Semaphore具有哪些特性? Semaphore通常使用在什么场景中? Semaphore的许可次数是否可以动态增减? Semaphore如何实现限流? ......

问题

(1)semaphore是什么?

(2)semaphore具有哪些特性?

(3)semaphore通常使用在什么场景中?

(4)semaphore的许可次数是否可以动态增减?

(5)semaphore如何实现限流?

简介

semaphore,信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可。

特性

semaphore通常用于限制同一时间对共享资源的访问次数上,也就是常说的限流。

下面我们一起来学习java中semaphore是如何实现的。

类结构

死磕 java同步系列之Semaphore源码解析

semaphore中包含了一个实现了aqs的同步器sync,以及它的两个子类fairsync和nonfairsync,这说明semaphore也是区分公平模式和非公平模式的。

源码分析

基于之前对于reentrantlock和reentrantreadwritelock的分析,这篇文章相对来说比较简单,之前讲过的一些方法将直接略过,有兴趣的可以拉到文章底部查看之前的文章。

内部类sync

// java.util.concurrent.semaphore.sync
abstract static class sync extends abstractqueuedsynchronizer {
    private static final long serialversionuid = 1192457210091910933l;
    // 构造方法,传入许可次数,放入state中
    sync(int permits) {
        setstate(permits);
    }
    // 获取许可次数
    final int getpermits() {
        return getstate();
    }
    // 非公平模式尝试获取许可
    final int nonfairtryacquireshared(int acquires) {
        for (;;) {
            // 看看还有几个许可
            int available = getstate();
            // 减去这次需要获取的许可还剩下几个许可
            int remaining = available - acquires;
            // 如果剩余许可小于0了则直接返回
            // 如果剩余许可不小于0,则尝试原子更新state的值,成功了返回剩余许可
            if (remaining < 0 ||
                compareandsetstate(available, remaining))
                return remaining;
        }
    }
    // 释放许可
    protected final boolean tryreleaseshared(int releases) {
        for (;;) {
            // 看看还有几个许可
            int current = getstate();
            // 加上这次释放的许可
            int next = current + releases;
            // 检测溢出
            if (next < current) // overflow
                throw new error("maximum permit count exceeded");
            // 如果原子更新state的值成功,就说明释放许可成功,则返回true
            if (compareandsetstate(current, next))
                return true;
        }
    }
    // 减少许可
    final void reducepermits(int reductions) {
        for (;;) {
            // 看看还有几个许可
            int current = getstate();
            // 减去将要减少的许可
            int next = current - reductions;
            // 检测举出
            if (next > current) // underflow
                throw new error("permit count underflow");
            // 原子更新state的值,成功了返回true
            if (compareandsetstate(current, next))
                return;
        }
    }
    // 销毁许可
    final int drainpermits() {
        for (;;) {
            // 看看还有几个许可
            int current = getstate();
            // 如果为0,直接返回
            // 如果不为0,把state原子更新为0
            if (current == 0 || compareandsetstate(current, 0))
                return current;
        }
    }
}

通过sync的几个实现方法,我们获取到以下几点信息:

(1)许可是在构造方法时传入的;

(2)许可存放在状态变量state中;

(3)尝试获取一个许可的时候,则state的值减1;

(4)当state的值为0的时候,则无法再获取许可;

(5)释放一个许可的时候,则state的值加1;

(6)许可的个数可以动态改变;

内部类nonfairsync

// java.util.concurrent.semaphore.nonfairsync
static final class nonfairsync extends sync {
    private static final long serialversionuid = -2694183684443567898l;
    // 构造方法,调用父类的构造方法
    nonfairsync(int permits) {
        super(permits);
    }
    // 尝试获取许可,调用父类的nonfairtryacquireshared()方法
    protected int tryacquireshared(int acquires) {
        return nonfairtryacquireshared(acquires);
    }
}

非公平模式下,直接调用父类的nonfairtryacquireshared()尝试获取许可。

内部类fairsync

// java.util.concurrent.semaphore.fairsync
static final class fairsync extends sync {
    private static final long serialversionuid = 2014338818796000944l;
    // 构造方法,调用父类的构造方法
    fairsync(int permits) {
        super(permits);
    }
    // 尝试获取许可
    protected int tryacquireshared(int acquires) {
        for (;;) {
            // 公平模式需要检测是否前面有排队的
            // 如果有排队的直接返回失败
            if (hasqueuedpredecessors())
                return -1;
            // 没有排队的再尝试更新state的值
            int available = getstate();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareandsetstate(available, remaining))
                return remaining;
        }
    }
}

公平模式下,先检测前面是否有排队的,如果有排队的则获取许可失败,进入队列排队,否则尝试原子更新state的值。

构造方法

// 构造方法,创建时要传入许可次数,默认使用非公平模式
public semaphore(int permits) {
    sync = new nonfairsync(permits);
}
// 构造方法,需要传入许可次数,及是否公平模式
public semaphore(int permits, boolean fair) {
    sync = fair ? new fairsync(permits) : new nonfairsync(permits);
}

创建semaphore时需要传入许可次数。

semaphore默认也是非公平模式,但是你可以调用第二个构造方法声明其为公平模式。

下面的方法在学习过前面的内容看来都比较简单,彤哥这里只列举semaphore支持的一些功能了。

以下的方法都是针对非公平模式来描述。

acquire()方法

public void acquire() throws interruptedexception {
    sync.acquiresharedinterruptibly(1);
}

获取一个许可,默认使用的是可中断方式,如果尝试获取许可失败,会进入aqs的队列中排队。

acquireuninterruptibly()方法

public void acquireuninterruptibly() {
    sync.acquireshared(1);
}

获取一个许可,非中断方式,如果尝试获取许可失败,会进入aqs的队列中排队。

tryacquire()方法

public boolean tryacquire() {
    return sync.nonfairtryacquireshared(1) >= 0;
}

尝试获取一个许可,使用sync的非公平模式尝试获取许可方法,不论是否获取到许可都返回,只尝试一次,不会进入队列排队。

tryacquire(long timeout, timeunit unit)方法

public boolean tryacquire(long timeout, timeunit unit)
    throws interruptedexception {
    return sync.tryacquiresharednanos(1, unit.tonanos(timeout));
}

尝试获取一个许可,先尝试一次获取许可,如果失败则会等待timeout时间,这段时间内都没有获取到许可,则返回false,否则返回true;

release()方法

public void release() {
    sync.releaseshared(1);
}

释放一个许可,释放一个许可时state的值会加1,并且会唤醒下一个等待获取许可的线程。

acquire(int permits)方法

public void acquire(int permits) throws interruptedexception {
    if (permits < 0) throw new illegalargumentexception();
    sync.acquiresharedinterruptibly(permits);
}

一次获取多个许可,可中断方式。

acquireuninterruptibly(int permits)方法

public void acquireuninterruptibly(int permits) {
    if (permits < 0) throw new illegalargumentexception();
    sync.acquireshared(permits);
}

一次获取多个许可,非中断方式。

tryacquire(int permits)方法

public boolean tryacquire(int permits) {
    if (permits < 0) throw new illegalargumentexception();
    return sync.nonfairtryacquireshared(permits) >= 0;
}

一次尝试获取多个许可,只尝试一次。

tryacquire(int permits, long timeout, timeunit unit)方法

public boolean tryacquire(int permits, long timeout, timeunit unit)
    throws interruptedexception {
    if (permits < 0) throw new illegalargumentexception();
    return sync.tryacquiresharednanos(permits, unit.tonanos(timeout));
}

尝试获取多个许可,并会等待timeout时间,这段时间没获取到许可则返回false,否则返回true。

release(int permits)方法

public void release(int permits) {
    if (permits < 0) throw new illegalargumentexception();
    sync.releaseshared(permits);
}

一次释放多个许可,state的值会相应增加permits的数量。

availablepermits()方法

public int availablepermits() {
    return sync.getpermits();
}

获取可用的许可次数。

drainpermits()方法

public int drainpermits() {
    return sync.drainpermits();
}

销毁当前可用的许可次数,对于已经获取的许可没有影响,会把当前剩余的许可全部销毁。

reducepermits(int reduction)方法

protected void reducepermits(int reduction) {
    if (reduction < 0) throw new illegalargumentexception();
    sync.reducepermits(reduction);
}

减少许可的次数。

总结

(1)semaphore,也叫信号量,通常用于控制同一时刻对共享资源的访问上,也就是限流场景;

(2)semaphore的内部实现是基于aqs的共享锁来实现的;

(3)semaphore初始化的时候需要指定许可的次数,许可的次数是存储在state中;

(4)获取一个许可时,则state值减1;

(5)释放一个许可时,则state值加1;

(6)可以动态减少n个许可;

(7)可以动态增加n个许可吗?

彩蛋

(1)如何动态增加n个许可?

答:调用release(int permits)即可。我们知道释放许可的时候state的值会相应增加,再回头看看释放许可的源码,发现与reentrantlock的释放锁还是有点区别的,semaphore释放许可的时候并不会检查当前线程有没有获取过许可,所以可以调用释放许可的方法动态增加一些许可。

(2)如何实现限流?

答:限流,即在流量突然增大的时候,上层要能够限制住突然的大流量对下游服务的冲击,在分布式系统中限流一般做在网关层,当然在个别功能中也可以自己简单地来限流,比如秒杀场景,假如只有10个商品需要秒杀,那么,服务本身可以限制同时只进来100个请求,其它请求全部作废,这样服务的压力也不会太大。

使用semaphore就可以直接针对这个功能来限流,以下是代码实现:

public class semaphoretest {
    public static final semaphore semaphore = new semaphore(100);
    public static final atomicinteger failcount = new atomicinteger(0);
    public static final atomicinteger successcount = new atomicinteger(0);

    public static void main(string[] args) {
        for (int i = 0; i < 1000; i++) {
            new thread(()->seckill()).start();
        }
    }

    public static boolean seckill() {
        if (!semaphore.tryacquire()) {
            system.out.println("no permits, count="+failcount.incrementandget());
            return false;
        }

        try {
            // 处理业务逻辑
            thread.sleep(2000);
            system.out.println("seckill success, count="+successcount.incrementandget());
        } catch (interruptedexception e) {
            // todo 处理异常
            e.printstacktrace();
        } finally {
            semaphore.release();
        }
        return true;
    }
}

推荐阅读

1、 死磕 java同步系列之开篇

2、 死磕 java魔法类之unsafe解析

3、 死磕 java同步系列之jmm(java memory model)

4、 死磕 java同步系列之volatile解析

5、 死磕 java同步系列之synchronized解析

6、 死磕 java同步系列之自己动手写一个锁lock

7、 死磕 java同步系列之aqs起篇

8、 死磕 java同步系列之reentrantlock源码解析(一)——公平锁、非公平锁

9、 死磕 java同步系列之reentrantlock源码解析(二)——条件锁

10、 死磕 java同步系列之reentrantlock vs synchronized

11、 死磕 java同步系列之reentrantreadwritelock源码解析

欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

死磕 java同步系列之Semaphore源码解析