自从使用了Semaphore,再也不用担心接口并发频率限制了
今天在写代码时遇到一个问题,在对接shopify电商平台api接口时,使用多线程并发处理数据返回了一个报错:
Exceeded 2 calls per second for api client. Reduce request rates to resume uninterrupted service.
平台接口有频率限制一秒钟只能够请求两次,多线程并发处理,导致大量数据处理失败,咋整?
按照我们以前的思维,这里肯定不适合使用多线程,乖乖换回单线程处理,并且每个请求睡眠0.5s。
但是作为一个多年经验的老手,这样做是不是有点太low了,而且效率上也大打折扣,单线程请求,必须要求当前循
环体代码全部处理完毕才能够进行下一次处理,也就是我们必须等待平台接口返回结果,并且处理完毕才能够进行下
一次处理,这样如果请求有延迟,我们的单次请求肯定是大于0.5s的,无法充分利用接口频率。
1、使用Semaphore
Semaphore是什么,能做什么?
Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。
相比于也就是说synchronized同一时刻只允许一个线程访问代码块,Semaphore可以允许有两个甚至是更多的线程访问到代码块。
获得一个项目之前,每个线程必须从信号量获取许可,保证项目可供使用。 当线程已经与该项目完成了它返回到池中并将许可返回到该信号量,允许另一个线程获取该项目。
注意,当没有同步锁定保持在acquire被称为这将阻止返回到池中的项目。 信号量封装了从维持池本身的一致性所需的任何同步限制访问池,需要单独的同步。
当信号量初始化为一个,并且用于使得其仅至多有一个允许使用,可以作为一个互斥锁。(等同于synchronized作用)
这通常被称为二进制信号量 ,因为它只能有两种状态:一个可用的许可,或零个许可可用。 当以这种方式使用的,二进制信号量的属性(不像许多java.util.concurrent.locks.Lock实现),谓之“锁”可以通过所有者以外的一个线程被释放(因为信号量没有所有权的概念)。
package com.example.demo.service;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
class TestServiceTest {
// 同步关键类,构造方法传入的数字是多少,则同一个时刻,只运行多少个线程同时运行指定代码
//当前设置为 2 表示只能够同时有两个线程 进入代码块
private static Semaphore semaphore = new Semaphore(2);
public static void main(String[] args) {
//初始时间
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
final int ii = i;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
//同时进来两个线程 上锁 并休眠一秒
semaphore.acquire();
Thread.sleep(1000);
//一秒后释放 新的线程可以进入
semaphore.release();
System.out.println("i="+ii+",进来了"+Thread.currentThread().getName()+","+(System.currentTimeMillis()-start)+"ms");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
}
}
看一下上面的代码:
- 我们初始化Semaphore 信号灯的访问权重是2,代表同一时刻可以有最多两个线程访问到我们的类部类业务代码块。
private static Semaphore semaphore = new Semaphore(2);
- 默认消耗权重为1,进来两个线程,上锁,休眠一秒后,请求到业务接口。
//同时进来两个线程 上锁 并休眠一秒
semaphore.acquire();
Thread.sleep(1000);
//一秒后释放 新的线程可以进入
semaphore.release();
代表一秒钟内,我们只允许两个线程进来,下一个操作需要等待当前线程释放锁,也就是semaphore.release()
;
这样我们去请求接口的时候不需要去等待返回结果,只要保证一秒钟内进来两个线程就行了,可以完美的利用到接口的限制频率。
i=0,进来了pool-1-thread-1,1007ms
i=1,进来了pool-1-thread-2,1007ms
i=2,进来了pool-1-thread-3,2007ms
i=3,进来了pool-1-thread-4,2007ms
i=5,进来了pool-1-thread-6,3008ms
i=4,进来了pool-1-thread-5,3008ms
i=6,进来了pool-1-thread-7,4008ms
i=7,进来了pool-1-thread-8,4008ms
i=8,进来了pool-1-thread-9,5009ms
i=9,进来了pool-1-thread-10,5009ms
2.方法 acquire( int permits ) 参数作用
注意:new Semaphore(2);
这里的参数2,代表的是权重,如果我们线程中的代码进行修改如下:
//消耗掉两个
semaphore.acquire(2);
Thread.sleep(1000);
//一秒后释放两个 新的线程可以进入
semaphore.release(2);
i=0,进来了pool-1-thread-1,1006ms
i=1,进来了pool-1-thread-2,2007ms
i=2,进来了pool-1-thread-3,3007ms
i=3,进来了pool-1-thread-4,4008ms
i=4,进来了pool-1-thread-5,5008ms
变成了一秒钟只有一个线程进来了,我们的代码进入线程数量,取决于初始化权重值,与线程消耗权重的比例。
//消耗掉2个
semaphore.acquire(2);
Thread.sleep(1000);
//一秒后释放一个
semaphore.release(1);
如果改成上面的线程进来消耗掉两个,只释放了一个,那么后续线程永远都将无法执行。最终导致程序卡死。
i=0,进来了pool-1-thread-1,1005ms
三、总结
1.使用Semaphore信号灯可以控制进入代码块的线程数量。
2.代码块中semaphore.acquire(2);消耗了多少权重,后边就要归还多少权重,否则会导致程序异常卡死。
本文地址:https://blog.csdn.net/flycp/article/details/107487399