redis实现分布式接口限流
程序员文章站
2022-05-29 18:57:48
...
限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务。
限流包括两种:
1.Nginx接入层限流
按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流
2.业务应用系统限流
通过业务代码控制流量这个流量可以被称为信号量,可以理解成是一种锁,它可以限制一项资源最多能同时被多少进程访问。
这里主要说说第二种使用redis在业务层进行限流。
公司做电商业务的,经常会发放优惠券,在某个点优惠券领取开放后,会有大量的并发流量进入,导致系统压力过大,为了限制并发流量,这里使用redis在业务层做了流量控制。
/**
* 如果用户在限流之外,则提示用户稍后再试
*/
int limitNum = 100;
long limitSecond = 1000;
if (redisUtil.acquireTokenFromBucket(limitNum, limitSecond) == null) {
logger.error("用户在限流之外,稍后再试");
return CommonResult.failed(-3,"用户在限流之外,稍后再试");
}
下面主要介绍一下redisUtil类中的这个方法(这个方法参考了别人的,原文请点击)
private static final String BUCKET = "BUCKET";
private static final String BUCKET_COUNT = "BUCKET_COUNT";
private static final String BUCKET_MONITOR = "BUCKET_MONITOR";
/**
* Redis实现分布式应用限流的方法
* @param limit 请求信号量
* @param timeout 请求时间
* @return
*/
public String acquireTokenFromBucket(int limit, long timeout) {
//这里获取jeids连接池,具体方法,可以自己实现
Jedis jedis = getJedis();
if(jedis == null){
return null;
}
//这里开始,是限流的方法
try{
String identifier = UUIDGenerator.generate32UUID();
long now = System.currentTimeMillis();
Transaction transaction = jedis.multi();
//删除信号量
transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(),
String.valueOf(now - timeout).getBytes());
ZParams params = new ZParams();
params.weightsByDouble(1.0,0.0);
transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR);
//计数器自增
transaction.incr(BUCKET_COUNT);
List<Object> results = transaction.exec();
long counter = (Long) results.get(results.size() - 1);
transaction = jedis.multi();
transaction.zadd(BUCKET_MONITOR, now, identifier);
transaction.zadd(BUCKET, counter, identifier);
transaction.zrank(BUCKET, identifier);
results = transaction.exec();
//获取排名,判断请求是否取得了信号量
long rank = (Long) results.get(results.size() - 1);
if (rank < limit) {
return identifier;
} else {//没有获取到信号量,清理之前放入redis 中垃圾数据
transaction = jedis.multi();
transaction.zrem(BUCKET_MONITOR, identifier);
transaction.zrem(BUCKET, identifier);
transaction.exec();
}
} catch (Exception e) {
log.error("在列表key的尾部插入元素失败:" + e.getMessage(), e);
returnBrokenResource(jedis);
} finally {
returnResource(jedis);
}
return null;
}
/**
* 释放jedis资源
*
* @param jedis
*/
@SuppressWarnings("deprecation")
public void returnResource(final Jedis jedis) {
if (jedis != null && jedisPool != null) {
jedisPool.returnResource(jedis);
}
}
@SuppressWarnings("deprecation")
public void returnBrokenResource(final Jedis jedis) {
if (jedis != null && jedisPool != null) {
jedisPool.returnBrokenResource(jedis);
}
}
业务层用法,如最开始的代码,通过返回判断是否为空,判断当前流量是否限流
以下是从网上找到的详细方法和测试
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.ZParams;
import java.util.List;
import java.util.UUID;
/**
* 实现类
*/
public class RedisRateLimiter {
private static final String BUCKET = "BUCKET";
private static final String BUCKET_COUNT = "BUCKET_COUNT";
private static final String BUCKET_MONITOR = "BUCKET_MONITOR";
static String acquireTokenFromBucket(
Jedis jedis, int limit, long timeout) {
String identifier = UUID.randomUUID().toString();
long now = System.currentTimeMillis();
Transaction transaction = jedis.multi();
//删除信号量
transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(), String.valueOf(now - timeout).getBytes());
ZParams params = new ZParams();
params.weightsByDouble(1.0,0.0);
transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR);
//计数器自增
transaction.incr(BUCKET_COUNT);
List<Object> results = transaction.exec();
long counter = (Long) results.get(results.size() - 1);
transaction = jedis.multi();
transaction.zadd(BUCKET_MONITOR, now, identifier);
transaction.zadd(BUCKET, counter, identifier);
transaction.zrank(BUCKET, identifier);
results = transaction.exec();
//获取排名,判断请求是否取得了信号量
long rank = (Long) results.get(results.size() - 1);
if (rank < limit) {
return identifier;
} else {//没有获取到信号量,清理之前放入redis 中垃圾数据
transaction = jedis.multi();
transaction.zrem(BUCKET_MONITOR, identifier);
transaction.zrem(BUCKET, identifier);
transaction.exec();
}
return null;
}
}
测试调用类
@GetMapping("/")
public void index(HttpServletResponse response) throws IOException {
Jedis jedis = jedisPool.getResource();
String token = RedisRateLimiter.acquireTokenFromBucket(jedis, LIMIT, TIMEOUT);
if (token == null) {
response.sendError(500);
}else{
//TODO 你的业务逻辑
}
jedisPool.returnResource(jedis);
}
此外还可以使用注解类+切面拦截来处理
@Configuration
static class WebMvcConfigurer extends WebMvcConfigurerAdapter {
private Logger logger = LoggerFactory.getLogger(WebMvcConfigurer.class);
@Autowired
private JedisPool jedisPool;
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new HandlerInterceptorAdapter() {
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Method method = handlerMethod.getMethod();
RateLimiter rateLimiter = method.getAnnotation(RateLimiter.class);
if (rateLimiter != null){
int limit = rateLimiter.limit();
int timeout = rateLimiter.timeout();
Jedis jedis = jedisPool.getResource();
String token = RedisRateLimiter.acquireTokenFromBucket(jedis, limit, timeout);
if (token == null) {
response.sendError(500);
return false;
}
logger.debug("token -> {}",token);
jedis.close();
}
return true;
}
}).addPathPatterns("/*");
}
}
定义注解类
/**
* 限流注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {
int limit() default 5; //限流数量
int timeout() default 1000; //限流时间
}
接口使用,直接使用注解
@RateLimiter(limit = 2, timeout = 5000)
@GetMapping("/test")
public void test() {
}
测试结果
并发测试
工具:apache-jmeter-3.2
说明: 没有获取到信号量的接口返回500,status是红色,获取到信号量的接口返回200,status是绿色。
当限制请求信号量为2,并发5个线程:
当限制请求信号量为5,并发10个线程:
总结:
1.对于信号量的操作,使用事务操作。
2.不要使用时间戳作为信号量的排序分数,因为在分布式环境中,各个节点的时间差的原因,会出现不公平信号量的现象。
3.可以使用把这块代码抽成@rateLimiter注解,然后再方法上使用就会很方便
4.不同接口的流控,可以参考源码的里面RedisRateLimiterPlus,无非是每个接口生成一个监控参数