欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

redis实现分布式接口限流

程序员文章站 2022-05-29 18:57:48
...

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务。

限流包括两种:
1.Nginx接入层限流
按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流
2.业务应用系统限流
通过业务代码控制流量这个流量可以被称为信号量,可以理解成是一种锁,它可以限制一项资源最多能同时被多少进程访问。

这里主要说说第二种使用redis在业务层进行限流。

公司做电商业务的,经常会发放优惠券,在某个点优惠券领取开放后,会有大量的并发流量进入,导致系统压力过大,为了限制并发流量,这里使用redis在业务层做了流量控制。

  /**
         * 如果用户在限流之外,则提示用户稍后再试
         */
        int limitNum = 100;
        long limitSecond = 1000;
        if (redisUtil.acquireTokenFromBucket(limitNum, limitSecond) == null) {
            logger.error("用户在限流之外,稍后再试");
            return CommonResult.failed(-3,"用户在限流之外,稍后再试");
        }

下面主要介绍一下redisUtil类中的这个方法(这个方法参考了别人的,原文请点击


    private static final String BUCKET = "BUCKET";
    private static final String BUCKET_COUNT = "BUCKET_COUNT";
    private static final String BUCKET_MONITOR = "BUCKET_MONITOR";
    
 /**
     * Redis实现分布式应用限流的方法
     * @param limit     请求信号量
     * @param timeout   请求时间
     * @return
     */
    public String acquireTokenFromBucket(int limit, long timeout) {
        //这里获取jeids连接池,具体方法,可以自己实现
        Jedis jedis = getJedis();
        if(jedis == null){
            return null;
        }
        //这里开始,是限流的方法
        try{
            String identifier = UUIDGenerator.generate32UUID();
            long now = System.currentTimeMillis();
            Transaction transaction = jedis.multi();

            //删除信号量
            transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(),
                    String.valueOf(now - timeout).getBytes());
            ZParams params = new ZParams();
            params.weightsByDouble(1.0,0.0);
            transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR);

            //计数器自增
            transaction.incr(BUCKET_COUNT);
            List<Object> results = transaction.exec();
            long counter = (Long) results.get(results.size() - 1);

            transaction = jedis.multi();
            transaction.zadd(BUCKET_MONITOR, now, identifier);
            transaction.zadd(BUCKET, counter, identifier);
            transaction.zrank(BUCKET, identifier);
            results = transaction.exec();
            //获取排名,判断请求是否取得了信号量
            long rank = (Long) results.get(results.size() - 1);
            if (rank < limit) {
                return identifier;
            } else {//没有获取到信号量,清理之前放入redis 中垃圾数据
                transaction = jedis.multi();
                transaction.zrem(BUCKET_MONITOR, identifier);
                transaction.zrem(BUCKET, identifier);
                transaction.exec();
            }
        } catch (Exception e) {
            log.error("在列表key的尾部插入元素失败:" + e.getMessage(), e);
            returnBrokenResource(jedis);
        } finally {
            returnResource(jedis);
        }
        return null;
    }

    /**
     * 释放jedis资源
     *
     * @param jedis
     */
    @SuppressWarnings("deprecation")
    public  void returnResource(final Jedis jedis) {
        if (jedis != null && jedisPool != null) {
            jedisPool.returnResource(jedis);
        }
    }

    @SuppressWarnings("deprecation")
    public  void returnBrokenResource(final Jedis jedis) {
        if (jedis != null && jedisPool != null) {
            jedisPool.returnBrokenResource(jedis);
        }
    }

业务层用法,如最开始的代码,通过返回判断是否为空,判断当前流量是否限流


以下是从网上找到的详细方法和测试

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.ZParams;
import java.util.List;
import java.util.UUID;
 
/**
 * 实现类
 */
public class RedisRateLimiter {
  private static final String BUCKET = "BUCKET";
  private static final String BUCKET_COUNT = "BUCKET_COUNT";
  private static final String BUCKET_MONITOR = "BUCKET_MONITOR";
 
  static String acquireTokenFromBucket(
      Jedis jedis, int limit, long timeout) {
    String identifier = UUID.randomUUID().toString();
    long now = System.currentTimeMillis();
    Transaction transaction = jedis.multi();
 
    //删除信号量
    transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(), String.valueOf(now - timeout).getBytes());
    ZParams params = new ZParams();
    params.weightsByDouble(1.0,0.0);
    transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR);
 
    //计数器自增
    transaction.incr(BUCKET_COUNT);
    List<Object> results = transaction.exec();
    long counter = (Long) results.get(results.size() - 1);
 
    transaction = jedis.multi();
    transaction.zadd(BUCKET_MONITOR, now, identifier);
    transaction.zadd(BUCKET, counter, identifier);
    transaction.zrank(BUCKET, identifier);
    results = transaction.exec();
    //获取排名,判断请求是否取得了信号量
    long rank = (Long) results.get(results.size() - 1);
    if (rank < limit) {
      return identifier;
    } else {//没有获取到信号量,清理之前放入redis 中垃圾数据
      transaction = jedis.multi();
      transaction.zrem(BUCKET_MONITOR, identifier);
      transaction.zrem(BUCKET, identifier);
      transaction.exec();
    }
    return null;
  }
}

测试调用类

@GetMapping("/")
public void index(HttpServletResponse response) throws IOException {
  Jedis jedis = jedisPool.getResource();
  String token = RedisRateLimiter.acquireTokenFromBucket(jedis, LIMIT, TIMEOUT);
  if (token == null) {
    response.sendError(500);
  }else{
    //TODO 你的业务逻辑
  }
  jedisPool.returnResource(jedis);
}

此外还可以使用注解类+切面拦截来处理

@Configuration
static class WebMvcConfigurer extends WebMvcConfigurerAdapter {
  private Logger logger = LoggerFactory.getLogger(WebMvcConfigurer.class);
  @Autowired
  private JedisPool jedisPool;
 
  public void addInterceptors(InterceptorRegistry registry) {
    registry.addInterceptor(new HandlerInterceptorAdapter() {
      public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
                   Object handler) throws Exception {
        HandlerMethod handlerMethod = (HandlerMethod) handler;
        Method method = handlerMethod.getMethod();
        RateLimiter rateLimiter = method.getAnnotation(RateLimiter.class);
        if (rateLimiter != null){
          int limit = rateLimiter.limit();
          int timeout = rateLimiter.timeout();
          Jedis jedis = jedisPool.getResource();
          String token = RedisRateLimiter.acquireTokenFromBucket(jedis, limit, timeout);
          if (token == null) {
            response.sendError(500);
            return false;
          }
          logger.debug("token -> {}",token);
          jedis.close();
        }
        return true;
      }
    }).addPathPatterns("/*");
  }
}

定义注解类

/**
 * 限流注解
 */
 
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {
  int limit() default 5; //限流数量
  int timeout() default 1000; //限流时间
}

接口使用,直接使用注解

@RateLimiter(limit = 2, timeout = 5000)
@GetMapping("/test")
public void test() {
}

测试结果
并发测试

工具:apache-jmeter-3.2
说明: 没有获取到信号量的接口返回500,status是红色,获取到信号量的接口返回200,status是绿色。
当限制请求信号量为2,并发5个线程:
redis实现分布式接口限流

当限制请求信号量为5,并发10个线程:
redis实现分布式接口限流

总结:
1.对于信号量的操作,使用事务操作。
2.不要使用时间戳作为信号量的排序分数,因为在分布式环境中,各个节点的时间差的原因,会出现不公平信号量的现象。
3.可以使用把这块代码抽成@rateLimiter注解,然后再方法上使用就会很方便
4.不同接口的流控,可以参考源码的里面RedisRateLimiterPlus,无非是每个接口生成一个监控参数

上一篇: NPM 使用介绍

下一篇: mysql多表查询