欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

对于线程间传递消息的一些想法 -- 智能指针+lambda caputre(封装task)

程序员文章站 2022-03-12 22:26:57
...

背景

最近公司开源了一个redis client,header only,项目地址: https://github.com/nokia/wiredis 。 作为公司build-block redis clinet 的维护者。 和作者聊了聊这个项目的想法和road map。 同时也提了一点建议,其中有个建议就是这篇文章要写的。

这个项目是header only的,所以文件很少,层次很清晰。主要有连个class,分为transport层和redis层。

想法就是利用boost::asio, 另起一个线程负责IO。在APP线程组装好要发送的RESP message 然后放到一个list中,通过io_service::dispatch通知IO thread 将其发送到网络。

想法的产生

在看代码的过程中我发现了如下的代码:

    void send(std::string &&buffer)
    {
        bool send_now = true;
        {
            std::unique_lock<std::mutex> guard(_send_buffer_mutex);
            send_now = _send_buffer.empty();

            if (_send_buffer_size + buffer.size() > SEND_BUFFER_LIMIT)
            {
                throw tcp_send_buffer_full("ERROR: TCP send buffer is full. Current limit is: " + std::to_string(SEND_BUFFER_LIMIT));
            }
            _send_buffer_size += buffer.size();

            // we can send now if the sending buffer is empty, otherwise the send-callback will do that.
            _send_buffer.emplace_back(std::move(buffer));
        }

        if (send_now)
        {
            _io_service.dispatch([this]() {
                try_to_send();
            });
        }
    }

乍一看,这段代码逻辑很正确。首先加锁,把要发送的string放到list 中,(解锁),通知IO thread。可是我觉得这个“锁”是不是多余了?作者说由于这个函数可能在不同的线程调用,所以_send_buffer可能在不同的线程中调操作,所以在这加了锁。

加锁这种操作会对系统性能造成一定影响,作为处女座,当然要想想怎么移除这个眼中钉。仔细分析了APP层和transport层,作者是想在发送redis RESP message之前储存一下callback,然后把要发的message放到一个list中。问题的root cause是至少有两个线程访问者两个数据结构。

第一想法是用无锁数据结构,这里应该是SPSC lockless queue。 之前也实现过, 但是这个是基于memory order的,虽然开销比锁少了很多, 但是还是有一定开销的。果断放弃。

既然不想从数据结构入手,那么就改变访问操作 ---- 能不能只让一个线程访问这两个数据结构嘞?把访问这两个数据结构的操作都丢给IO 线程岂不是解决问题了。

C++11 开始支持lambda函数,lambda函数有个特性就是可以捕捉变量。可以以值捕获或者引用捕获。对我们来说就是捕获要发送的信息。

这里就有问题:

  1. 值捕获会做拷贝
  2. 要发送的信息是一个右值,如果不用值捕获,马上就会失效

为了解决这个问题, 当然要有大胆的想法。

我们可以先搞个class,把要带到IOthread的东西封装一下,然后弄成一个智能指针,这样只要值捕获智能指针就OK了。当然智能指针捕获之后就相当于把owner转到IO 线程了。不会有多线程操作智能指针的尴尬(操作是异步的,shared_ptr会在APP线程立刻将引用计数减一在IOthread之前【forget about context switch】)。

其实就是把数据(message,callback)放到class中,把操作用lambda函数表示。也就是把要做的事情封装成task发送给IO线程处理。(数据+操作)

下面写个小例子作为例子:

#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <thread>
#include <boost/asio/steady_timer.hpp>
#include <list>
#include <string>

std::list<std::string> _glob_list;
class test
{
  public:
    test() { std::cout << " in the ctor. thread id is : " << std::this_thread::get_id() << std::endl; }
    ~test() { std::cout << " in the dtor. thread id is : " << std::this_thread::get_id() << std::endl; }
    void hello()
    {
        _glob_list.push_back("test");
        std::cout << "hello world in thread : " << std::this_thread::get_id() << std::endl;
    }
};

int main()
{
    ::boost::asio::io_service ios;
    std::cout << " main thread id is : " << std::this_thread::get_id() << std::endl;
    // start timer to keep io_service running
    boost::asio::steady_timer _timer(ios);
    _timer.expires_from_now(std::chrono::milliseconds(10000));
    _timer.async_wait([](boost::system::error_code const &error) {
        std::cout << "timer fires" << std::endl;
        std::cout << "message in the list is : " << std::endl;
        for (auto it : _glob_list)
        {
            std::cout << it << std::endl;
        }
    });
    std::thread scheduler_thread(
        [&] {
            ios.run();
            std::cout << "ios exit" << std::endl;
        });
    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    {
        std::shared_ptr<test> test_sptr(new test);
        ios.dispatch([test_sptr]() {
            test_sptr->hello();
        });
    }
    // simulate another thread access the list
    std::thread test_thread(
        [&] {
            ios.dispatch([]() {
                _glob_list.push_back("test");
            });
        });
    scheduler_thread.join();
    test_thread.join();
    std::cout << "exit test" << std::endl;
    return 0;
}

输出:

[email protected]:~/github/wiredis/wiredis/include/wiredis$ g++ test.cpp -lboost_system -lpthread -std=c++11
[email protected]:~/github/wiredis/wiredis/include/wiredis$ ./a.out
 main thread id is : 140558957459264
 in the ctor. thread id is : 140558957459264
hello world in thread : 140558937151232
 in the dtor. thread id is : 140558937151232
timer fires
message in the list is :
test
test
ios exit
exit test