[C++]阻塞和非阻塞的队列的性能对比(Non-BlockingQueue&BlockingQueue benchmark)
关键字:阻塞队列、非阻塞队列、性能对比、Non-BlockingQueue、BlockingQueue、benchmark、Performance
原文作者:@玄冬Wong
转载请注明出处:http://aigo.iteye.com/blog/2292169
对阻塞和非阻塞队列进行测试:
1,用boost::lockfree::spsc_queue非阻塞队列;(注:spsc_queue是单个生产者单个消费者线程安全的队列,多个生产者多个消费者线程安全的boost::lockfree::queue理论上没有spsc_queue性能好,当然实际上也是这样。。)
2,使用boost::mutex、boost::condition_variable实现的阻塞队列;
先公布结果:
1,如果队列容量足够大,那么非阻塞队列完爆阻塞队列,速度领先至少一个数量级;
2,如果队列容量相对数据总量较小,那么阻塞队列完爆非阻塞队列,速度领先也是一个数量级以上;
现在的服务器动辄32G内存,至于哪种队列更适合服务器,你懂的:)
测试代码:
#include <windows.h> #include <iostream> #include <time.h> #include <thread> #include <list> #include <boost/lockfree/spsc_queue.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition_variable.hpp> #include <boost/circular_buffer.hpp> #define LOOP_COUNT 10000000 #define QUEUE_CAPACITY 10000000 boost::condition_variable cv; boost::mutex m; boost::circular_buffer<int> cb(QUEUE_CAPACITY); boost::lockfree::spsc_queue<int, boost::lockfree::capacity<QUEUE_CAPACITY>> spscq; int count = 0; void blocking_productor() { for (int i = 0; i < LOOP_COUNT; i++) { boost::unique_lock<boost::mutex> lock(m); while (cb.size() == QUEUE_CAPACITY) { cv.wait(lock); } cb.push_back(i); cv.notify_one(); } } void blocking_customer() { while (1) { boost::unique_lock<boost::mutex> lock(m); while (cb.size() == 0) { cv.wait(lock); } cb.pop_front(); cv.notify_one(); if (++count >= LOOP_COUNT) { break; } } } void nonblocking_productor() { for (int i = 0; i < LOOP_COUNT; i++) { while (spscq.write_available() == 0) { Sleep(1); } spscq.push(i); } } void nonblocking_customer() { while (1) { if (spscq.pop()) { ++count; } else { Sleep(1); } if (count >= LOOP_COUNT) { break; } } } void test_blocking() { clock_t start = clock(); std::thread *bc_t = new std::thread((&blocking_customer)); std::thread *bp_t = new std::thread((&blocking_productor)); bc_t->join(); bp_t->join(); clock_t end = clock(); printf("[test_blocking]\nQUEUE_CAPACITY:%d\ncost:%dms\n", QUEUE_CAPACITY, end - start); printf("count:%d\n", count); delete bc_t; delete bp_t; } void test_nonblocking() { clock_t start = clock(); std::thread *nc_t = new std::thread((&nonblocking_customer)); std::thread *np_t = new std::thread((&nonblocking_productor)); nc_t->join(); np_t->join(); clock_t end = clock(); printf("[test_nonblocking]\nQUEUE_CAPACITY:%d\ncost:%dms\n", QUEUE_CAPACITY, end - start); printf("count:%d\n", count); delete nc_t; delete np_t; } int main(char* args, int size) { //为了排除测试程序的无关因素,测试时只开启一个:blocking或者nonblocking。 //test_blocking(); test_nonblocking(); }
不同队列容量的测试结果:
读写线程分别执行一千万次的写入和读取操作
阻塞
[test_blocking]
QUEUE_CAPACITY:10
cost:13758ms
count:10000000
[test_blocking]
QUEUE_CAPACITY:1000
cost:1320ms
count:10000000
[test_blocking]
QUEUE_CAPACITY:10000000
cost:1178ms
count:10000000
非阻塞
[test_nonblocking]
QUEUE_CAPACITY:1000
cost:20330ms
count:10000000
[test_nonblocking]
QUEUE_CAPACITY:10000
cost:1827ms
count:10000000
[test_nonblocking]
QUEUE_CAPACITY:100000
cost:195ms
count:10000000
[test_nonblocking]
QUEUE_CAPACITY:1000000
cost:92ms
count:10000000
[test_nonblocking]
QUEUE_CAPACITY:10000000
cost:90ms
count:10000000
总结:
读写线程分别执行一千万次的写入和读取操作,
阻塞模式下,1000的队列容量就可以将性能发挥到极限:耗时1200毫秒左右;
非阻塞模式下,一百万的队列容量才能将性能发挥到极限:耗时90毫秒以内;
测试环境:
boost 1.60
windows 10 pro x64
VS2015企业版 update2,release x64
CPU:i7二代移动版
补充内容:
1,阻塞和非阻塞的适用环境
这篇文章讲得比较好:如果队列空闲状态占多数,那么阻塞队列性能更好,如果队列有待读取元素的情况占多数,那么非阻塞的无锁队列性能更好。
boost c++ lock-free queue vs shared queue
http://*.com/questions/16275112/boost-c-lock-free-queue-vs-shared-queue
上面测试情况中,如果队列容量较小,那么读写两个线程大部分时间会是在相互等待;如果容量足够大,那么两个线程可以一路狂奔。
2,java中的阻塞和非阻塞实现
java中的LinkedBlockingQueue,其内部也是通过ReentrantLock和Condition实现阻塞的,上面的结论同样适用于java的相关API。
java中如果要使用无锁队列,可以使用ConcurrentLinkedQueue,其功能相当于boost::lockfree::queue,可支持多个生产者多个消费者的线程。
无锁和非阻塞当以当作同一种概念:阻塞肯定需要锁实现,而非阻塞可以无锁(wait-free)。
3,std::mutex和std::condition_variable的测试结果
将阻塞摸下的相关boost接口换成了std,结果还是一样的,区别不大。
修改代码如下:
#include <mutex> std::mutex m; std::condition_variable cv; void blocking_productor() { for (int i = 0; i < LOOP_COUNT; i++) { std::unique_lock<std::mutex> lk(m); cv.wait(lk, [] {return cb.size() < QUEUE_CAPACITY; }); cb.push_back(i); cv.notify_one(); } } void blocking_customer() { while (1) { std::unique_lock<std::mutex> lk(m); cv.wait(lk, [] {return cb.size() > 0; }); cb.pop_front(); cv.notify_one(); if (++count >= LOOP_COUNT) { break; } } }
输出结果:
[test_blocking]
QUEUE_CAPACITY:1000
cost:1557ms
count:10000000
[test_blocking]
QUEUE_CAPACITY:10000000
cost:1081ms
count:10000000
4,spsc_queue不一定是容量越大性能越好(20160420记)
我通过不同容量和不同循环次数发现:1亿次循环的情况下(相当于生产者填充1亿条消息),spsc_queue的容量超过1000w后(最大容量不能超过8000w),性能将开始下降。
另外,性能取决于生产者和消费者两个线程填充和取出消息的速度,我这里测试时两个线程比较极端,生产者填充消息的速度大概时20w条/1ms,这种情况下,spsc_queue需要的最佳容量为最总1000w;如果生产者和消费者填充和取出消息的速度越慢,则相对容量就越低,我测下来的情况是:1千万次循环,如果生产者和消费者的循环内部再添加一个循环次数为100的逻辑,则两者处理消息的速度均会大幅下降,此时spsc_queue的容量为1w即可发挥最佳性能。
所以结论是:如果想榨干CPU的极限性能,还是得从实际项目中具体的需求上取做测试。个人建议是,将spsc_queue的容量设置为1w,因为一般游戏的上层业务消息的逻辑也挺复杂,估计随便一个逻辑的耗时都能达到1ms,这样一个工作线程1秒内只能处理1000条消息(假设波动范围为1000~5000),那么1w的队列容量是够的(注意是单读单写)。
5,spsc_queue使用相关的优化(20160422记)
void nonblocking_customer() { int req_pop = 512; int* valarr = new int[req_pop]; int pop_size; while (1) { if (pop_size = spscq.pop(valarr, req_pop)) { for(int i = 0; i < pop_size; i++) { count++; } } else { Sleep(1); } if (count >= LOOP_COUNT) { break; } } }
之前是每次pop一个元素,现在改成一次pop尽可能多的元素。这样优化以后,1千万次累加,非阻塞队列的耗时能从之前的90毫秒降低到33毫秒。测试发现,获取元素的数组大小设置为512就可以达到最佳性能了。