详解Spring Cloud Gateway 限流操作
开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。
api网关作为所有请求的入口,请求量大,我们可以通过对并发访问的请求进行限速来保护系统的可用性。
常用的限流算法比如有令牌桶算法,漏桶算法,计数器算法等。
在zuul中我们可以自己去实现限流的功能 (zuul中如何限流在我的书 《spring cloud微服务-全栈技术与案例解析》 中有详细讲解) ,spring cloud gateway的出现本身就是用来替代zuul的。
要想替代那肯定得有强大的功能,除了性能上的优势之外,spring cloud gateway还提供了很多新功能,比如今天我们要讲的限流操作,使用起来非常简单,今天我们就来学习在如何在spring cloud gateway中进行限流操作。
目前限流提供了基于redis的实现,我们需要增加对应的依赖:
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis-reactive</artifactid> </dependency>
可以通过keyresolver来指定限流的key,比如我们需要根据用户来做限流,ip来做限流等等。
ip限流
@bean public keyresolver ipkeyresolver() { return exchange -> mono.just(exchange.getrequest().getremoteaddress().gethostname()); }
通过exchange对象可以获取到请求信息,这边用了hostname,如果你想根据用户来做限流的话这边可以获取当前请求的用户id或者用户名就可以了,比如:
用户限流
使用这种方式限流,请求路径中必须携带userid参数。
@bean keyresolver userkeyresolver() { return exchange -> mono.just(exchange.getrequest().getqueryparams().getfirst("userid")); }
接口限流
获取请求地址的uri作为限流key。
@bean keyresolver apikeyresolver() { return exchange -> mono.just(exchange.getrequest().getpath().value()); }
然后配置限流的过滤器信息:
server: port: 8084 spring: redis: host: 127.0.0.1 port: 6379 cloud: gateway: routes: - id: fsh-house uri: lb://fsh-house predicates: - path=/house/** filters: - name: requestratelimiter args: redis-rate-limiter.replenishrate: 10 redis-rate-limiter.burstcapacity: 20 key-resolver: "#{@ipkeyresolver}"
- filter名称必须是requestratelimiter
- redis-rate-limiter.replenishrate:允许用户每秒处理多少个请求
- redis-rate-limiter.burstcapacity:令牌桶的容量,允许在一秒钟内完成的最大请求数
- key-resolver:使用spel按名称引用bean
可以访问接口进行测试,这时候redis中会有对应的数据:
127.0.0.1:6379> keys *
1) "request_rate_limiter.{localhost}.timestamp"
2) "request_rate_limiter.{localhost}.tokens"
大括号中就是我们的限流key,这边是ip,本地的就是localhost
- timestamp:存储的是当前时间的秒数,也就是system.currenttimemillis() / 1000或者instant.now().getepochsecond()
- tokens:存储的是当前这秒钟的对应的可用的令牌数量
spring cloud gateway目前提供的限流还是相对比较简单的,在实际中我们的限流策略会有很多种情况,比如:
- 每个接口的限流数量不同,可以通过配置中心动态调整
- 超过的流量被拒绝后可以返回固定的格式给调用方
- 对某个服务进行整体限流(这个大家可以思考下用spring cloud gateway如何实现,其实很简单)
- ……
当然我们也可以通过重新redisratelimiter来实现自己的限流策略,这个我们后面再进行介绍。
限流源码
// routeid也就是我们的fsh-house,id就是限流的key,也就是localhost。 public mono<response> isallowed(string routeid, string id) { // 会判断redisratelimiter是否初始化了 if (!this.initialized.get()) { throw new illegalstateexception("redisratelimiter is not initialized"); } // 获取routeid对应的限流配置 config routeconfig = getconfig().getordefault(routeid, defaultconfig); if (routeconfig == null) { throw new illegalargumentexception("no configuration found for route " + routeid); } // 允许用户每秒做多少次请求 int replenishrate = routeconfig.getreplenishrate(); // 令牌桶的容量,允许在一秒钟内完成的最大请求数 int burstcapacity = routeconfig.getburstcapacity(); try { // 限流key的名称(request_rate_limiter.{localhost}.timestamp,request_rate_limiter.{localhost}.tokens) list<string> keys = getkeys(id); // the arguments to the lua script. time() returns unixtime in seconds. list<string> scriptargs = arrays.aslist(replenishrate + "", burstcapacity + "", instant.now().getepochsecond() + "", "1"); // allowed, tokens_left = redis.eval(script, keys, args) // 执行lua脚本 flux<list<long>> flux = this.redistemplate.execute(this.script, keys, scriptargs); // .log("redisratelimiter", level.finer); return flux.onerrorresume(throwable -> flux.just(arrays.aslist(1l, -1l))) .reduce(new arraylist<long>(), (longs, l) -> { longs.addall(l); return longs; }) .map(results -> { boolean allowed = results.get(0) == 1l; long tokensleft = results.get(1); response response = new response(allowed, getheaders(routeconfig, tokensleft)); if (log.isdebugenabled()) { log.debug("response: " + response); } return response; }); } catch (exception e) { log.error("error determining if user allowed from redis", e); } return mono.just(new response(true, getheaders(routeconfig, -1l))); }
lua脚本在:
local tokens_key = keys[1] local timestamp_key = keys[2] --redis.log(redis.log_warning, "tokens_key " .. tokens_key) local rate = tonumber(argv[1]) local capacity = tonumber(argv[2]) local now = tonumber(argv[3]) local requested = tonumber(argv[4]) local fill_time = capacity/rate local ttl = math.floor(fill_time*2) --redis.log(redis.log_warning, "rate " .. argv[1]) --redis.log(redis.log_warning, "capacity " .. argv[2]) --redis.log(redis.log_warning, "now " .. argv[3]) --redis.log(redis.log_warning, "requested " .. argv[4]) --redis.log(redis.log_warning, "filltime " .. fill_time) --redis.log(redis.log_warning, "ttl " .. ttl) local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end --redis.log(redis.log_warning, "last_tokens " .. last_tokens) local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.log_warning, "last_refreshed " .. last_refreshed) local delta = math.max(0, now-last_refreshed) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end --redis.log(redis.log_warning, "delta " .. delta) --redis.log(redis.log_warning, "filled_tokens " .. filled_tokens) --redis.log(redis.log_warning, "allowed_num " .. allowed_num) --redis.log(redis.log_warning, "new_tokens " .. new_tokens) redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) return { allowed_num, new_tokens }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。