欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

高精度/微秒级线程的实现

程序员文章站 2022-06-11 18:03:37
...

前言

在项目中需要实现一个功能,来对项目做一些特殊的工作.这个工作中需要实现某个线程中按照特定间隔(100微秒~10毫秒)来定时执行.实现过程中发现只要使用执行休眠的函数(sleep,sleep_for,sleep_until)每次线程轮询的时间都必定大于1.5毫秒(不同主频的CPU,可能时间会有一些差别),为了满足需求,使用一些方法来达到这个目的.

普通线程轮询

使用std::thread创建线程,使用std::chrono来处理时间.
创建一个线程,然后在线程当中运行,我们可以发现,线程轮询间隔大概是1500毫秒~2000毫秒.
代码如下(统计1001次,取平均值,并且已消除cout打印的时间影响)

#include "stdafx.h"
#include <windows.h>
#include <thread>
#include <iostream>
using namespace std;
using namespace std::chrono;
#define INTERVAL_ARRAY_LENGTH                1000 + 1 //第一个数据不使用
void thread_function_1() {
    high_resolution_clock::time_point time_point_start, time_point_end;
    uint64_t interval_in_microsecond, interval_array[INTERVAL_ARRAY_LENGTH] = {}, interval_array_index = 0;
    uint64_t total, average;
    cout << __FUNCTION__ << endl;
    while (1) {
        time_point_start = high_resolution_clock::now();
        std::this_thread::sleep_for(std::chrono::milliseconds(1));//这里替换成Sleep(1)效果也是一样的.
        time_point_end = high_resolution_clock::now();
        interval_in_microsecond = duration_cast<chrono::microseconds>(time_point_end - time_point_start).count();
        interval_array[interval_array_index++] = interval_in_microsecond;
        //当收集完数据后,对数据做一次统计和打印.
        if (interval_array_index == INTERVAL_ARRAY_LENGTH) {
            interval_array_index = 0;
            total = 0;
            for (int i = 1; i < INTERVAL_ARRAY_LENGTH; i++) {
                total += interval_array[i];
            }
            average = total / (INTERVAL_ARRAY_LENGTH - 1);
            std::cout << "thread[1] average:" << average << ",interval_array[0]=" << interval_array[0] << std::flush << std::endl;
        }
    }
}
int main()
{
    std::thread thread_1(thread_function_1);
    system("pause");
    return 0;
}

结果如下
高精度/微秒级线程的实现
从上面输出结果来看,每个线程的平均轮询时间是1500微秒~2000微秒.并且在上面的代码中,将std::this_thread::sleep_for(std::chrono::milliseconds(1));修改为std::this_thread::sleep_for(std::chrono::microseconds(100));,线程轮询的时间仍然为1500毫秒~2000毫秒之间,在MSDN上查阅一下sleep函数的定义,大概是这样”调用sleep之后,会让线程主动放弃CPU的使用权,此时CPU会去执行其他任务,当设定的时间到达后,CPU在执行这个线程,设定的时间单位是毫秒”,这里其实包含两个地方会导致延迟时间精度不高

  • 接口参数设定的单位是毫秒,不能精确到微秒/纳秒.
  • 放弃CPU使用权后,CPU会去执行其他任务,当其他任务(高优先级或同优先级)没有放弃CPU使用权时,该任务会一直等待,导致延迟时间加长.

针对上面的两个点,这里用timeBeginPeriod/timeEndPeriod来设置定时器的精度(这两个接口是msdn推荐的,但是单位仍然是毫秒…),同时提高该线程的优先级为最高优先级(同一进程中),修改后代码如下

#include "stdafx.h"
#include <windows.h>
#include <thread>
#include <iostream>
#pragma comment(lib,"WinMM.Lib")
using namespace std;
using namespace std::chrono;
#define INTERVAL_ARRAY_LENGTH                1000 + 1 //第一个数据不使用
void thread_function_1() {
    high_resolution_clock::time_point time_point_start, time_point_end;
    uint64_t interval_in_microsecond, interval_array[INTERVAL_ARRAY_LENGTH] = {}, interval_array_index = 0;
    uint64_t total, average;
    cout << __FUNCTION__ << endl;
    while (1) {
        time_point_start = high_resolution_clock::now();
        timeBeginPeriod(1);//set Minimum timer resolution.
        std::this_thread::sleep_for(std::chrono::microseconds(100));
        timeEndPeriod(1);
        time_point_end = high_resolution_clock::now();
        interval_in_microsecond = duration_cast<chrono::microseconds>(time_point_end - time_point_start).count();
        interval_array[interval_array_index++] = interval_in_microsecond;
        //当收集完数据后,对数据做一次统计和打印.
        if (interval_array_index == INTERVAL_ARRAY_LENGTH) {
            interval_array_index = 0;
            total = 0;
            for (int i = 1; i < INTERVAL_ARRAY_LENGTH; i++) {
                total += interval_array[i];
            }
            average = total / (INTERVAL_ARRAY_LENGTH - 1);
            std::cout << "thread[1] average:" << average << ",interval_array[0]=" << interval_array[0] << std::flush << std::endl;
        }
    }
}
int main()
{
    std::thread thread_1(thread_function_1);
    //set thread priority
    if (SetThreadPriority(thread_1.native_handle(), THREAD_PRIORITY_TIME_CRITICAL)) {
        cout << "set thread_1 priority to time critical" << endl;
    }
    system("pause");
    return 0;
}

结果如下
高精度/微秒级线程的实现
轮询的时间间隔有所改善,平均延迟时间大概在1500毫秒左右,比之前有所改善,不过离我们的目标还比较远.

微秒级线程实现

在google上找了一圈的资料……(省略2天时间),还是没办法在带休眠的情况下实现微秒级别的轮询,最后不得不使用一些措施来达到这种效果,主要的功能点是

  • 线程永不休眠(最大的坏处).
  • 将线程的优先级提高为最高优先级(如果还需要更高,需要同时把线程所在的进程同时提高到高优先级),防止被其他线程抢占.
  • 将线程绑定在CPU特定核心上运行,减少任务栈切换,提高效果,

实现代码如下

// high_precision_thread.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <windows.h>
#include <iostream>
#include <thread>
#include <timeapi.h>
#pragma comment(lib,"WinMM.Lib")
using namespace std;
using namespace std::chrono;
#define THE_NUMBER_OF_PROCESSOR_CORES        4         //CPU核心数,本机是4个
#define INTERVAL_ARRAY_LENGTH                1000 + 1 //第一个数据包含了打印时间,剔除.
#define PROCESSOR_3_MASK                    0x01 << 3
void thread_function_1() {
    high_resolution_clock::time_point time_point_start, time_point_end;
    uint64_t interval_in_microsecond,interval_array[INTERVAL_ARRAY_LENGTH] = {}, interval_array_index = 0;
    uint64_t total, average;
    uint64_t processor_running_times[THE_NUMBER_OF_PROCESSOR_CORES] = {};
    while (1) {
        time_point_start = high_resolution_clock::now();
        timeBeginPeriod(1);
        std::this_thread::sleep_for(std::chrono::microseconds(100));
        timeEndPeriod(1);
        time_point_end = high_resolution_clock::now();
        //计算每次循环的时间.
        interval_in_microsecond = duration_cast<chrono::microseconds>(time_point_end - time_point_start).count();
        interval_array[interval_array_index++] = interval_in_microsecond;
        //记录当前线程运行在哪个CPU上.
        processor_running_times[GetCurrentProcessorNumber()]++;
        //每次打印会占用很多时间,先存储在数组中,最后一次性打印出来.
        if (interval_array_index == INTERVAL_ARRAY_LENGTH) {
            interval_array_index = 0;
            total = 0;
            for (int i = 1; i < INTERVAL_ARRAY_LENGTH; i++) {
                total += interval_array[i];
            }
            average = total / (INTERVAL_ARRAY_LENGTH - 1);
            //打印平均耗时,以及在每个core上运行的次数.
            std::cout << "thread[1] average:" << average << " microseconds,interval_array[0]=" << interval_array[0] << ",processor_run_times(" << processor_running_times[0] << "," << processor_running_times[1] << "," << processor_running_times[2] << "," << processor_running_times[3] << ")" << std::endl;
        }
    }
}
void thread_function_3() {
    high_resolution_clock::time_point time_point_start, time_point_end;
    uint64_t interval_in_microsecond, interval_array[INTERVAL_ARRAY_LENGTH] = {}, interval_array_index = 0;
    uint64_t total, average;
    uint64_t processor_running_times[THE_NUMBER_OF_PROCESSOR_CORES] = {};
    Sleep(100);
    high_resolution_clock::time_point tp_start = high_resolution_clock::now();
    while (1) {
        if (duration_cast<chrono::microseconds>(high_resolution_clock::now() - tp_start).count() >= 100) {
            high_resolution_clock::time_point tp_end = high_resolution_clock::now();
            //计算每次循环的时间.
            interval_in_microsecond = duration_cast<chrono::microseconds>(tp_end - tp_start).count();
            //每次打印会占用很多时间,先存储在数组中,最后一次性打印出来.
            interval_array[interval_array_index++] = interval_in_microsecond;
            tp_start = tp_end;
            processor_running_times[GetCurrentProcessorNumber()]++;
            //数组填满后,打印平均值.或则在debug模式下,看数组中的数据
            if (interval_array_index == INTERVAL_ARRAY_LENGTH) {
                interval_array_index = 0;
                total = 0;
                for (int i = 1; i < INTERVAL_ARRAY_LENGTH; i++) {
                    total += interval_array[i];
                }
                average = total / (INTERVAL_ARRAY_LENGTH - 1);
                //这里添加打印会调用std::cout会主动放弃core3,会有几率让core3被其他线程占用.
                std::cout << "thread[3] average:" << average << " microseconds,interval_array[0]=" << interval_array[0] << ",processor_run_times(" << processor_running_times[0] << "," << processor_running_times[1] << "," << processor_running_times[2] << "," << processor_running_times[3] << ")" << std::endl;
            }
        }
    }
}
int main()
{
    std::thread thread_1(thread_function_1),thread_3(thread_function_3);
    //设定线程3为最高优先级.
    SetThreadPriority(thread_3.native_handle(), THREAD_PRIORITY_TIME_CRITICAL);
    //将线程3绑定在core3上运行.
    SetThreadAffinityMask(thread_3.native_handle(), PROCESSOR_3_MASK);
    system("pause");
    return 0;
}

效果如下
高精度/微秒级线程的实现
从上图数据可以看出,线程3的轮询间隔是非常精准的100微秒(如果希望做到1微秒只需要将上面代码100修改为1即可).另外线程3只运行在core3上,而线程1多数运行在core0,core1,core2上,有少量次数运行在core3,运行在core3的原因是因为线程3中有std::cout打印导致的,如果将线程3的打印代码去除,可以看到线程1永远没机会运行在core3上.

最后

按照上面的方法虽然获得了高精度的线程轮询间隔,但是需要占用一个核心来做这个事情,是比较浪费资源的(最坏的情况会让4核CPU性能直接降低了25%),但目前来看没有其他方式能够实现精准的微秒的轮询功能(如果有,麻烦告诉我一声).在实际项目中,如果不得不使用这种方式来实现,这里个人觉得有不少可以优化的空间,就是想办法把这25%的功能用上(目前大部分时间是空转),理论上最好的情况可以完全消除这种资源的浪费:假设你的项目中有多个需要精确定时的模块A,B,C,D,E.这5个模块需要轮询的间隔都是100微秒,每个模块执行的时间都是20微秒,这样就可以实现完美的轮询间隔,而不造成任何资源的浪费(当然,这是理想的状态).
最后,这是一个抛砖引玉的例子,应该有更好的方法,欢迎指教,谢谢!

资料链接

scheduling priorities
MSDN:sleep
timeBeginPeriod