java api 接口限流
一,前言
高并发下,api接口面临压力过大的情况。针对此类情况,一般有一下几种处理方案。
缓存
缓存的目的是提升系统访问速度和增大系统处理容量
降级
降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开
限流
限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理
一般开发高并发系统常见的限流模式有控制并发和控制速率,一个是限制并发的总数量(比如数据库连接池、线程池),一个是限制并发访问的速率(如nginx的limit_conn模块,用来限制瞬时并发连接数),另外还可以限制单位时间窗口内的请求数量(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率)。其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。
相关概念
PV:page view 页面总访问量,每刷新一次记录一次。
UV:unique view 客户端主机访问,指一天内相同IP的访问记为1次。
QPS:query per second,即每秒访问量。qps很大程度上代表了系统的繁忙度,没次请求可能存在多次的磁盘io,网络请求,多个cpu时间片,一旦qps超过了预先设置的阀值,可以考量扩容增加服务器,避免访问量过大导致的宕机。
RT:response time,每次请求的响应时间,直接决定用户体验性。
光说不练假把式。下面来具体的实现下。
二,实现
控制并发和控制速率两种方法的使用。
package com.g7go.safe.other;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import static java.lang.Thread.currentThread;
/**
* 关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)
* Semaphore:从线程个数限流
* RateLimiter:从速率限流 目前常见的算法是漏桶算法和令牌算法,下面会具体介绍
*
* @author lwc
* @version 1.0
**/
public class RateLimiterExample {
/**
* Guava 0.5的意思是 1秒中0.5次的操作,2秒1次的操作 从速度来限流,从每秒中能够执行的次数来
*/
private final static RateLimiter limiter = RateLimiter.create(0.5d);
/**
* 同时只能有三个线程工作 Java1.5 从同时处理的线程个数来限流
*/
private final static Semaphore sem = new Semaphore(3);
private static void testSemaphore() {
try {
sem.acquire();
System.out.println(currentThread().getName() + " is doing work...");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
sem.release();
System.out.println(currentThread().getName() + " release the semephore..other thread can get and do job");
}
}
public static void runTestSemaphore() {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
service.submit(RateLimiterExample::testSemaphore);
}
}
/**
* Guava的RateLimiter
*/
private static void testLimiter() {
System.out.println(currentThread().getName() + " waiting " + limiter.acquire());
}
/**
* Guava的RateLimiter
*/
public static void runTestLimiter() {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
service.submit(RateLimiterExample::testLimiter);
}
}
public static void main(String[] args) {
// runTestSemaphore();
runTestLimiter();
}
}
针对控制并发访问的速率,一般会采用令牌桶或者漏铜算法,下面分别实现
漏桶
package com.g7go.safe.other;
import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static java.lang.Thread.currentThread;
/**
* 实现漏桶算法 实现多线程生产者消费者模型 限流
*
* @author lwc
* @version 1.0
**/
public class Bucket {
/**
* 桶
*/
private final ConcurrentLinkedQueue<Integer> container = new ConcurrentLinkedQueue<>();
/**
* 定义桶的大小
*/
private final static int BUCKET_LIMIT = 1000;
/**
* 消费者 不论多少个线程,每秒最大的处理能力是1秒中执行10次
*/
private final RateLimiter consumerRate = RateLimiter.create(10d);
/**
* 往桶里面放数据时,确认没有超过桶的最大的容量
*/
private Monitor offerMonitor = new Monitor();
/**
* 从桶里消费数据时,桶里必须存在数据
*/
private Monitor consumerMonitor = new Monitor();
/**
* 往桶里面写数据
*
* @param data
*/
public void submit(Integer data) {
if (offerMonitor.enterIf(offerMonitor.newGuard(() -> container.size() < BUCKET_LIMIT))) {
try {
container.offer(data);
System.out.println(currentThread() + " submit.." + data + " container size is :[" + container.size() + "]");
} finally {
offerMonitor.leave();
}
} else {
//这里时候采用降级策略了。消费速度跟不上产生速度时,而且桶满了,抛出异常
//或者存入MQ DB等后续处理
throw new IllegalStateException(currentThread().getName() + "The bucket is ful..Pls latter can try...");
}
}
/**
* 从桶里面消费数据
*
* @param consumer
*/
public void takeThenConsumer(Consumer<Integer> consumer) {
if (consumerMonitor.enterIf(consumerMonitor.newGuard(() -> !container.isEmpty()))) {
try {
System.out.println(currentThread() + " waiting" + consumerRate.acquire());
Integer data = container.poll();
consumer.accept(data);
} finally {
consumerMonitor.leave();
}
} else {
//当木桶的消费完后,可以消费那些降级存入MQ或者DB里面的数据
System.out.println("will consumer Data from MQ...");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
漏桶测试
package com.g7go.safe.other;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import static java.lang.Thread.currentThread;
/**
* 漏桶算法测试
* 实现漏桶算法 实现多线程生产者消费者模型 限流
*
* @author lwc
*/
public class BuckerTest {
public static void main(String[] args) {
final Bucket bucket = new Bucket();
final AtomicInteger dataCreator = new AtomicInteger(0);
//生产线程 10个线程 每秒提交 50个数据 1/0.2s*10=50个
IntStream.range(0, 10).forEach(i -> {
new Thread(() -> {
for (; ; ) {
int data = dataCreator.incrementAndGet();
try {
bucket.submit(data);
TimeUnit.MILLISECONDS.sleep(200);
} catch (Exception e) {
//对submit时,如果桶满了可能会抛出异常
if (e instanceof IllegalStateException) {
System.out.println(e.getMessage());
//当满了后,生产线程就休眠1分钟
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
}).start();
});
//消费线程 采用RateLimiter每秒处理10个 综合的比率是5:1
IntStream.range(0, 10).forEach(i -> {
new Thread(
() -> {
for (; ; ) {
bucket.takeThenConsumer(x -> {
System.out.println(currentThread() + "C.." + x);
});
}
}
).start();
});
}
}
三,项目中的应用
至于令牌桶我们直接在项目中实现
编写接口
package com.g7go.safe;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author Mr_Lee
* @date 2019-12-25 09:02
*/
@RestController
public class SafeController {
@GetMapping("getData")
public String getData() throws InterruptedException {
Thread.sleep(2000);
System.out.println("This is Data");
return "This is Data";
}
}
定义拦截器
package com.g7go.safe.safe;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* @author lwc
*/
public abstract class AbstractInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
return preHandle(request);
}
protected abstract boolean preHandle(HttpServletRequest request);
}
package com.g7go.safe.safe;
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
/**
* @author lwc
*/
@Component
public class RateLimitInterceptor extends AbstractInterceptor {
public static final int REQUEST_COUNT = 1;
private static final RateLimiter rateLimiter = RateLimiter.create(REQUEST_COUNT);
@Override
protected boolean preHandle(HttpServletRequest request) {
if (!rateLimiter.tryAcquire()) {
System.out.println(">>>>>>>>>> 亲!请稍后重试!");
return false;
}
return true;
}
}
注册拦截器
package com.g7go.safe.safe;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* 添加拦截器
*
* @author lwc
*/
@Component
public class WebMvcConfig implements WebMvcConfigurer {
private final RateLimitInterceptor rateLimitInterceptor;
public WebMvcConfig(RateLimitInterceptor rateLimitInterceptor) {
this.rateLimitInterceptor = rateLimitInterceptor;
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(rateLimitInterceptor).addPathPatterns("/**");
}
}
当然这里默认拦截所有的接口,也可以通过正则或者注解声明需要拦截的接口,这里就不再赘述。
启动项目测试。
都在意料之中。
本文章涉及到谷歌的guava的应用。不懂之处自行查看文档。
上一篇: 两种方式实现二叉树BFS层序遍历
下一篇: iOS探索--点语法和对象初始化