欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java api 接口限流

程序员文章站 2022-03-26 16:49:17
...

一,前言

高并发下,api接口面临压力过大的情况。针对此类情况,一般有一下几种处理方案。

缓存 缓存的目的是提升系统访问速度和增大系统处理容量

降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开

限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理

  一般开发高并发系统常见的限流模式有控制并发和控制速率,一个是限制并发的总数量(比如数据库连接池、线程池),一个是限制并发访问的速率(如nginx的limit_conn模块,用来限制瞬时并发连接数),另外还可以限制单位时间窗口内的请求数量(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率)。其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。

相关概念
PV:page view 页面总访问量,每刷新一次记录一次。

UV:unique view 客户端主机访问,指一天内相同IP的访问记为1次。

QPS:query per second,即每秒访问量。qps很大程度上代表了系统的繁忙度,没次请求可能存在多次的磁盘io,网络请求,多个cpu时间片,一旦qps超过了预先设置的阀值,可以考量扩容增加服务器,避免访问量过大导致的宕机。

RT:response time,每次请求的响应时间,直接决定用户体验性。

光说不练假把式。下面来具体的实现下。

二,实现

控制并发和控制速率两种方法的使用。

package com.g7go.safe.other;

import com.google.common.util.concurrent.RateLimiter;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

import static java.lang.Thread.currentThread;

/**
 * 关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)
 * Semaphore:从线程个数限流
 * RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法,下面会具体介绍
 *
 * @author lwc
 * @version 1.0
 **/
public class RateLimiterExample {

    /**
     * Guava  0.5的意思是 1秒中0.5次的操作,2秒1次的操作  从速度来限流,从每秒中能够执行的次数来
     */
    private final static RateLimiter limiter = RateLimiter.create(0.5d);

    /**
     * 同时只能有三个线程工作 Java1.5  从同时处理的线程个数来限流
     */
    private final static Semaphore sem = new Semaphore(3);

    private static void testSemaphore() {
        try {
            sem.acquire();
            System.out.println(currentThread().getName() + " is doing work...");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            sem.release();
            System.out.println(currentThread().getName() + " release the semephore..other thread can get and do job");
        }
    }

    public static void runTestSemaphore() {
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            service.submit(RateLimiterExample::testSemaphore);
        }
    }

    /**
     * Guava的RateLimiter
     */
    private static void testLimiter() {
        System.out.println(currentThread().getName() + " waiting  " + limiter.acquire());
    }

    /**
     * Guava的RateLimiter
     */
    public static void runTestLimiter() {
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            service.submit(RateLimiterExample::testLimiter);
        }
    }


    public static void main(String[] args) {
//        runTestSemaphore();
        runTestLimiter();
    }
}

针对控制并发访问的速率,一般会采用令牌桶或者漏铜算法,下面分别实现

漏桶

package com.g7go.safe.other;

import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.RateLimiter;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static java.lang.Thread.currentThread;

/**
 * 实现漏桶算法 实现多线程生产者消费者模型 限流
 *
 * @author lwc
 * @version 1.0
 **/
public class Bucket {

    /**
     * 桶
     */
    private final ConcurrentLinkedQueue<Integer> container = new ConcurrentLinkedQueue<>();

    /**
     * 定义桶的大小
     */
    private final static int BUCKET_LIMIT = 1000;

    /**
     * 消费者 不论多少个线程,每秒最大的处理能力是1秒中执行10次
     */
    private final RateLimiter consumerRate = RateLimiter.create(10d);

    /**
     * 往桶里面放数据时,确认没有超过桶的最大的容量
     */
    private Monitor offerMonitor = new Monitor();

    /**
     * 从桶里消费数据时,桶里必须存在数据
     */
    private Monitor consumerMonitor = new Monitor();


    /**
     * 往桶里面写数据
     *
     * @param data
     */
    public void submit(Integer data) {
        if (offerMonitor.enterIf(offerMonitor.newGuard(() -> container.size() < BUCKET_LIMIT))) {
            try {
                container.offer(data);
                System.out.println(currentThread() + " submit.." + data + " container size is :[" + container.size() + "]");
            } finally {
                offerMonitor.leave();
            }
        } else {
            //这里时候采用降级策略了。消费速度跟不上产生速度时,而且桶满了,抛出异常
            //或者存入MQ DB等后续处理
            throw new IllegalStateException(currentThread().getName() + "The bucket is ful..Pls latter can try...");
        }
    }


    /**
     * 从桶里面消费数据
     *
     * @param consumer
     */
    public void takeThenConsumer(Consumer<Integer> consumer) {
        if (consumerMonitor.enterIf(consumerMonitor.newGuard(() -> !container.isEmpty()))) {
            try {
                System.out.println(currentThread() + "  waiting" + consumerRate.acquire());
                Integer data = container.poll();
                consumer.accept(data);
            } finally {
                consumerMonitor.leave();
            }
        } else {
            //当木桶的消费完后,可以消费那些降级存入MQ或者DB里面的数据
            System.out.println("will consumer Data from MQ...");
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

漏桶测试

package com.g7go.safe.other;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static java.lang.Thread.currentThread;

/**
 * 漏桶算法测试
 * 实现漏桶算法 实现多线程生产者消费者模型 限流
 *
 * @author lwc
 */
public class BuckerTest {

    public static void main(String[] args) {
        final Bucket bucket = new Bucket();
        final AtomicInteger dataCreator = new AtomicInteger(0);

        //生产线程 10个线程 每秒提交 50个数据  1/0.2s*10=50个
        IntStream.range(0, 10).forEach(i -> {
            new Thread(() -> {
                for (; ; ) {
                    int data = dataCreator.incrementAndGet();
                    try {
                        bucket.submit(data);
                        TimeUnit.MILLISECONDS.sleep(200);
                    } catch (Exception e) {
                        //对submit时,如果桶满了可能会抛出异常
                        if (e instanceof IllegalStateException) {
                            System.out.println(e.getMessage());
                            //当满了后,生产线程就休眠1分钟
                            try {
                                TimeUnit.SECONDS.sleep(60);
                            } catch (InterruptedException e1) {
                                e1.printStackTrace();
                            }
                        }
                    }
                }
            }).start();
        });

        //消费线程  采用RateLimiter每秒处理10个  综合的比率是5:1
        IntStream.range(0, 10).forEach(i -> {
            new Thread(
                    () -> {
                        for (; ; ) {
                            bucket.takeThenConsumer(x -> {
                                System.out.println(currentThread() + "C.." + x);
                            });
                        }
                    }
            ).start();
        });
    }
}

三,项目中的应用

至于令牌桶我们直接在项目中实现

编写接口

package com.g7go.safe;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author Mr_Lee
 * @date 2019-12-25 09:02
 */
@RestController
public class SafeController {

    @GetMapping("getData")
    public String getData() throws InterruptedException {
        Thread.sleep(2000);
        System.out.println("This is Data");
        return "This is Data";
    }

}

定义拦截器

package com.g7go.safe.safe;

import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * @author lwc
 */
public abstract class AbstractInterceptor extends HandlerInterceptorAdapter {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        return preHandle(request);
    }

    protected abstract boolean preHandle(HttpServletRequest request);
}
package com.g7go.safe.safe;

import com.google.common.util.concurrent.RateLimiter;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;

/**
 * @author lwc
 */
@Component
public class RateLimitInterceptor extends AbstractInterceptor {

    public static final int REQUEST_COUNT = 1;

    private static final RateLimiter rateLimiter = RateLimiter.create(REQUEST_COUNT);

    @Override
    protected boolean preHandle(HttpServletRequest request) {
        if (!rateLimiter.tryAcquire()) {
            System.out.println(">>>>>>>>>> 亲!请稍后重试!");
            return false;
        }
        return true;
    }
}

注册拦截器

package com.g7go.safe.safe;

import org.springframework.stereotype.Component;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
 * 添加拦截器
 *
 * @author lwc
 */
@Component
public class WebMvcConfig implements WebMvcConfigurer {

    private final RateLimitInterceptor rateLimitInterceptor;

    public WebMvcConfig(RateLimitInterceptor rateLimitInterceptor) {
        this.rateLimitInterceptor = rateLimitInterceptor;
    }

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(rateLimitInterceptor).addPathPatterns("/**");
    }
}

当然这里默认拦截所有的接口,也可以通过正则或者注解声明需要拦截的接口,这里就不再赘述。

启动项目测试。

java api 接口限流

都在意料之中。

本文章涉及到谷歌的guava的应用。不懂之处自行查看文档。

 

相关标签: tool