对于线程间传递消息的一些想法 -- 智能指针+lambda caputre(封装task)
背景
最近公司开源了一个redis client,header only,项目地址: https://github.com/nokia/wiredis 。 作为公司build-block redis clinet 的维护者。 和作者聊了聊这个项目的想法和road map。 同时也提了一点建议,其中有个建议就是这篇文章要写的。
这个项目是header only的,所以文件很少,层次很清晰。主要有连个class,分为transport层和redis层。
想法就是利用boost::asio, 另起一个线程负责IO。在APP线程组装好要发送的RESP message 然后放到一个list中,通过io_service::dispatch通知IO thread 将其发送到网络。
想法的产生
在看代码的过程中我发现了如下的代码:
void send(std::string &&buffer)
{
bool send_now = true;
{
std::unique_lock<std::mutex> guard(_send_buffer_mutex);
send_now = _send_buffer.empty();
if (_send_buffer_size + buffer.size() > SEND_BUFFER_LIMIT)
{
throw tcp_send_buffer_full("ERROR: TCP send buffer is full. Current limit is: " + std::to_string(SEND_BUFFER_LIMIT));
}
_send_buffer_size += buffer.size();
// we can send now if the sending buffer is empty, otherwise the send-callback will do that.
_send_buffer.emplace_back(std::move(buffer));
}
if (send_now)
{
_io_service.dispatch([this]() {
try_to_send();
});
}
}
乍一看,这段代码逻辑很正确。首先加锁,把要发送的string放到list 中,(解锁),通知IO thread。可是我觉得这个“锁”是不是多余了?作者说由于这个函数可能在不同的线程调用,所以_send_buffer可能在不同的线程中调操作,所以在这加了锁。
加锁这种操作会对系统性能造成一定影响,作为处女座,当然要想想怎么移除这个眼中钉。仔细分析了APP层和transport层,作者是想在发送redis RESP message之前储存一下callback,然后把要发的message放到一个list中。问题的root cause是至少有两个线程访问者两个数据结构。
第一想法是用无锁数据结构,这里应该是SPSC lockless queue。 之前也实现过, 但是这个是基于memory order的,虽然开销比锁少了很多, 但是还是有一定开销的。果断放弃。
既然不想从数据结构入手,那么就改变访问操作 ---- 能不能只让一个线程访问这两个数据结构嘞?把访问这两个数据结构的操作都丢给IO 线程岂不是解决问题了。
C++11 开始支持lambda函数,lambda函数有个特性就是可以捕捉变量。可以以值捕获或者引用捕获。对我们来说就是捕获要发送的信息。
这里就有问题:
- 值捕获会做拷贝
- 要发送的信息是一个右值,如果不用值捕获,马上就会失效
为了解决这个问题, 当然要有大胆的想法。
我们可以先搞个class,把要带到IOthread的东西封装一下,然后弄成一个智能指针,这样只要值捕获智能指针就OK了。当然智能指针捕获之后就相当于把owner转到IO 线程了。不会有多线程操作智能指针的尴尬(操作是异步的,shared_ptr会在APP线程立刻将引用计数减一在IOthread之前【forget about context switch】)。
其实就是把数据(message,callback)放到class中,把操作用lambda函数表示。也就是把要做的事情封装成task发送给IO线程处理。(数据+操作)
下面写个小例子作为例子:
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <thread>
#include <boost/asio/steady_timer.hpp>
#include <list>
#include <string>
std::list<std::string> _glob_list;
class test
{
public:
test() { std::cout << " in the ctor. thread id is : " << std::this_thread::get_id() << std::endl; }
~test() { std::cout << " in the dtor. thread id is : " << std::this_thread::get_id() << std::endl; }
void hello()
{
_glob_list.push_back("test");
std::cout << "hello world in thread : " << std::this_thread::get_id() << std::endl;
}
};
int main()
{
::boost::asio::io_service ios;
std::cout << " main thread id is : " << std::this_thread::get_id() << std::endl;
// start timer to keep io_service running
boost::asio::steady_timer _timer(ios);
_timer.expires_from_now(std::chrono::milliseconds(10000));
_timer.async_wait([](boost::system::error_code const &error) {
std::cout << "timer fires" << std::endl;
std::cout << "message in the list is : " << std::endl;
for (auto it : _glob_list)
{
std::cout << it << std::endl;
}
});
std::thread scheduler_thread(
[&] {
ios.run();
std::cout << "ios exit" << std::endl;
});
std::this_thread::sleep_for(std::chrono::milliseconds(100));
{
std::shared_ptr<test> test_sptr(new test);
ios.dispatch([test_sptr]() {
test_sptr->hello();
});
}
// simulate another thread access the list
std::thread test_thread(
[&] {
ios.dispatch([]() {
_glob_list.push_back("test");
});
});
scheduler_thread.join();
test_thread.join();
std::cout << "exit test" << std::endl;
return 0;
}
输出:
[email protected]:~/github/wiredis/wiredis/include/wiredis$ g++ test.cpp -lboost_system -lpthread -std=c++11
[email protected]:~/github/wiredis/wiredis/include/wiredis$ ./a.out
main thread id is : 140558957459264
in the ctor. thread id is : 140558957459264
hello world in thread : 140558937151232
in the dtor. thread id is : 140558937151232
timer fires
message in the list is :
test
test
ios exit
exit test