详解springboot+aop+Lua分布式限流的最佳实践
一、什么是限流?为什么要限流?
不知道大家有没有做过帝都的地铁,就是进地铁站都要排队的那种,为什么要这样摆长龙转圈圈?答案就是为了限流
!因为一趟地铁的运力是有限的,一下挤进去太多人会造成站台的拥挤、列车的超载,存在一定的安全隐患。同理,我们的程序也是一样,它处理请求的能力也是有限的,一旦请求多到超出它的处理极限就会崩溃。为了不出现最坏的崩溃情况,只能耽误一下大家进站的时间。
限流是保证系统高可用的重要手段!!!
由于互联网公司的流量巨大,系统上线会做一个流量峰值的评估,尤其是像各种秒杀促销活动,为了保证系统不被巨大的流量压垮,会在系统流量到达一定阈值时,拒绝掉一部分流量。
限流会导致用户在短时间内(这个时间段是毫秒级的)系统不可用,一般我们衡量系统处理能力的指标是每秒的qps
或者tps
,假设系统每秒的流量阈值是1000,理论上一秒内有第1001个请求进来时,那么这个请求就会被限流。
二、限流方案
1、计数器
java内部也可以通过原子类计数器atomicinteger
、semaphore
信号量来做简单的限流。
// 限流的个数 private int maxcount = 10; // 指定的时间内 private long interval = 60; // 原子类计数器 private atomicinteger atomicinteger = new atomicinteger(0); // 起始时间 private long starttime = system.currenttimemillis(); public boolean limit(int maxcount, int interval) { atomicinteger.addandget(1); if (atomicinteger.get() == 1) { starttime = system.currenttimemillis(); atomicinteger.addandget(1); return true; } // 超过了间隔时间,直接重新开始计数 if (system.currenttimemillis() - starttime > interval * 1000) { starttime = system.currenttimemillis(); atomicinteger.set(1); return true; } // 还在间隔时间内,check有没有超过限流的个数 if (atomicinteger.get() > maxcount) { return false; } return true; }
2、漏桶算法
漏桶算法思路很简单,我们把水比作是请求
,漏桶比作是系统处理能力极限
,水先进入到漏桶里,漏桶里的水按一定速率流出,当流出的速率小于流入的速率时,由于漏桶容量有限,后续进入的水直接溢出(拒绝请求),以此实现限流。
3、令牌桶算法
令牌桶算法的原理也比较简单,我们可以理解成医院的挂号看病,只有拿到号以后才可以进行诊病。
系统会维护一个令牌(token
)桶,以一个恒定的速度往桶里放入令牌(token
),这时如果有请求进来想要被处理,则需要先从桶里获取一个令牌(token
),当桶里没有令牌(token
)可取时,则该请求将被拒绝服务。令牌桶算法通过控制桶的容量、发放令牌的速率,来达到对请求的限制。
4、redis + lua
很多同学不知道lua
是啥?个人理解,lua
脚本和 mysql
数据库的存储过程比较相似,他们执行一组命令,所有命令的执行要么全部成功或者失败,以此达到原子性。也可以把lua
脚本理解为,一段具有业务逻辑的代码块。
而lua
本身就是一种编程语言,虽然redis
官方没有直接提供限流相应的api
,但却支持了 lua
脚本的功能,可以使用它实现复杂的令牌桶或漏桶算法,也是分布式系统中实现限流的主要方式之一。
相比redis
事务,lua脚本
的优点:
- 减少网络开销: 使用lua脚本,无需向redis 发送多次请求,执行一次即可,减少网络传输
- 原子操作:redis 将整个lua脚本作为一个命令执行,原子,无需担心并发
- 复用:lua脚本一旦执行,会永久保存 redis 中,,其他客户端可复用
lua
脚本大致逻辑如下:
-- 获取调用脚本时传入的第一个key值(用作限流的 key) local key = keys[1] -- 获取调用脚本时传入的第一个参数值(限流大小) local limit = tonumber(argv[1]) -- 获取当前流量大小 local curentlimit = tonumber(redis.call('get', key) or "0") -- 是否超出限流 if curentlimit + 1 > limit then -- 返回(拒绝) return 0 else -- 没有超出 value + 1 redis.call("incrby", key, 1) -- 设置过期时间 redis.call("expire", key, 2) -- 返回(放行) return 1 end
- 通过keys[1] 获取传入的key参数
- 通过argv[1]获取传入的limit参数
- redis.call方法,从缓存中get和key相关的值,如果为null那么就返回0
- 接着判断缓存中记录的数值是否会大于限制大小,如果超出表示该被限流,返回0
- 如果未超过,那么该key的缓存值+1,并设置过期时间为1秒钟以后,并返回缓存值+1
这种方式是本文推荐的方案,具体实现会在后边做细说。
5、网关层限流
限流常在网关这一层做,比如nginx
、openresty
、kong
、zuul
、spring cloud gateway
等,而像spring cloud - gateway
网关限流底层实现原理,就是基于redis + lua
,通过内置lua
限流脚本的方式。
三、redis + lua 限流实现
下面我们通过自定义注解
、aop
、redis + lua
实现限流,步骤会比较详细,为了小白能让快速上手这里啰嗦一点,有经验的老鸟们多担待一下。
1、环境准备
springboot
项目创建地址:,很方便实用的一个工具。
2、引入依赖包
pom文件中添加如下依赖包,比较关键的就是 spring-boot-starter-data-redis
和 spring-boot-starter-aop
。
<dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-aop</artifactid> </dependency> <dependency> <groupid>com.google.guava</groupid> <artifactid>guava</artifactid> <version>21.0</version> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> </dependency> <dependency> <groupid>org.apache.commons</groupid> <artifactid>commons-lang3</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> <exclusions> <exclusion> <groupid>org.junit.vintage</groupid> <artifactid>junit-vintage-engine</artifactid> </exclusion> </exclusions> </dependency> </dependencies>
3、配置application.properties
在 application.properties
文件中配置提前搭建好的 redis
服务地址和端口。
spring.redis.host=127.0.0.1 spring.redis.port=6379
4、配置redistemplate实例
@configuration public class redislimiterhelper { @bean public redistemplate<string, serializable> limitredistemplate(lettuceconnectionfactory redisconnectionfactory) { redistemplate<string, serializable> template = new redistemplate<>(); template.setkeyserializer(new stringredisserializer()); template.setvalueserializer(new genericjackson2jsonredisserializer()); template.setconnectionfactory(redisconnectionfactory); return template; } }
限流类型枚举类
/** * @author fu * @description 限流类型 * @date 2020/4/8 13:47 */ public enum limittype { /** * 自定义key */ customer, /** * 请求者ip */ ip; }
5、自定义注解
我们自定义个@limit
注解,注解类型为elementtype.method
即作用于方法上。
period
表示请求限制时间段,count
表示在period
这个时间段内允许放行请求的次数。limittype
代表限流的类型,可以根据请求的ip
、自定义key
,如果不传limittype
属性则默认用方法名作为默认key。
/** * @author fu * @description 自定义限流注解 * @date 2020/4/8 13:15 */ @target({elementtype.method, elementtype.type}) @retention(retentionpolicy.runtime) @inherited @documented public @interface limit { /** * 名字 */ string name() default ""; /** * key */ string key() default ""; /** * key的前缀 */ string prefix() default ""; /** * 给定的时间范围 单位(秒) */ int period(); /** * 一定时间内最多访问次数 */ int count(); /** * 限流的类型(用户自定义key 或者 请求ip) */ limittype limittype() default limittype.customer; }
6、切面代码实现
/** * @author fu * @description 限流切面实现 * @date 2020/4/8 13:04 */ @aspect @configuration public class limitinterceptor { private static final logger logger = loggerfactory.getlogger(limitinterceptor.class); private static final string unknown = "unknown"; private final redistemplate<string, serializable> limitredistemplate; @autowired public limitinterceptor(redistemplate<string, serializable> limitredistemplate) { this.limitredistemplate = limitredistemplate; } /** * @param pjp * @author fu * @description 切面 * @date 2020/4/8 13:04 */ @around("execution(public * *(..)) && @annotation(com.xiaofu.limit.api.limit)") public object interceptor(proceedingjoinpoint pjp) { methodsignature signature = (methodsignature) pjp.getsignature(); method method = signature.getmethod(); limit limitannotation = method.getannotation(limit.class); limittype limittype = limitannotation.limittype(); string name = limitannotation.name(); string key; int limitperiod = limitannotation.period(); int limitcount = limitannotation.count(); /** * 根据限流类型获取不同的key ,如果不传我们会以方法名作为key */ switch (limittype) { case ip: key = getipaddress(); break; case customer: key = limitannotation.key(); break; default: key = stringutils.uppercase(method.getname()); } immutablelist<string> keys = immutablelist.of(stringutils.join(limitannotation.prefix(), key)); try { string luascript = buildluascript(); redisscript<number> redisscript = new defaultredisscript<>(luascript, number.class); number count = limitredistemplate.execute(redisscript, keys, limitcount, limitperiod); logger.info("access try count is {} for name={} and key = {}", count, name, key); if (count != null && count.intvalue() <= limitcount) { return pjp.proceed(); } else { throw new runtimeexception("you have been dragged into the blacklist"); } } catch (throwable e) { if (e instanceof runtimeexception) { throw new runtimeexception(e.getlocalizedmessage()); } throw new runtimeexception("server exception"); } } /** * @author fu * @description 编写 redis lua 限流脚本 * @date 2020/4/8 13:24 */ public string buildluascript() { stringbuilder lua = new stringbuilder(); lua.append("local c"); lua.append("\nc = redis.call('get',keys[1])"); // 调用不超过最大值,则直接返回 lua.append("\nif c and tonumber(c) > tonumber(argv[1]) then"); lua.append("\nreturn c;"); lua.append("\nend"); // 执行计算器自加 lua.append("\nc = redis.call('incr',keys[1])"); lua.append("\nif tonumber(c) == 1 then"); // 从第一次调用开始限流,设置对应键值的过期 lua.append("\nredis.call('expire',keys[1],argv[2])"); lua.append("\nend"); lua.append("\nreturn c;"); return lua.tostring(); } /** * @author fu * @description 获取id地址 * @date 2020/4/8 13:24 */ public string getipaddress() { httpservletrequest request = ((servletrequestattributes) requestcontextholder.getrequestattributes()).getrequest(); string ip = request.getheader("x-forwarded-for"); if (ip == null || ip.length() == 0 || unknown.equalsignorecase(ip)) { ip = request.getheader("proxy-client-ip"); } if (ip == null || ip.length() == 0 || unknown.equalsignorecase(ip)) { ip = request.getheader("wl-proxy-client-ip"); } if (ip == null || ip.length() == 0 || unknown.equalsignorecase(ip)) { ip = request.getremoteaddr(); } return ip; } }
7、控制层实现
我们将@limit
注解作用在需要进行限流的接口方法上,下边我们给方法设置@limit
注解,在10秒
内只允许放行3个
请求,这里为直观一点用atomicinteger
计数。
/** * @author: fu * @description: */ @restcontroller public class limitercontroller { private static final atomicinteger atomic_integer_1 = new atomicinteger(); private static final atomicinteger atomic_integer_2 = new atomicinteger(); private static final atomicinteger atomic_integer_3 = new atomicinteger(); /** * @author fu * @description * @date 2020/4/8 13:42 */ @limit(key = "limittest", period = 10, count = 3) @getmapping("/limittest1") public int testlimiter1() { return atomic_integer_1.incrementandget(); } /** * @author fu * @description * @date 2020/4/8 13:42 */ @limit(key = "customer_limit_test", period = 10, count = 3, limittype = limittype.customer) @getmapping("/limittest2") public int testlimiter2() { return atomic_integer_2.incrementandget(); } /** * @author fu * @description * @date 2020/4/8 13:42 */ @limit(key = "ip_limit_test", period = 10, count = 3, limittype = limittype.ip) @getmapping("/limittest3") public int testlimiter3() { return atomic_integer_3.incrementandget(); } }
8、测试
测试预期:连续请求3次均可以成功,第4次请求被拒绝。接下来看一下是不是我们预期的效果,请求地址:http://127.0.0.1:8080/limittest1
,用postman
进行测试,有没有postman
url直接贴浏览器也是一样。
可以看到第四次请求时,应用直接拒绝了请求,说明我们的 springboot + aop + lua 限流方案搭建成功。
总结
以上 springboot + aop + lua
限流实现是比较简单的,旨在让大家认识下什么是限流?如何做一个简单的限流功能,面试要知道这是个什么东西。上面虽然说了几种实现限流的方案,但选哪种还要结合具体的业务场景,不能为了用而用。
到此这篇关于详解springboot+aop+lua分布式限流的最佳实践的文章就介绍到这了,更多相关springboot+aop+lua分布式限流内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
下一篇: 电商怎么带货?为什么要做电商带货?