欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

利用信号量实现线程同步

程序员文章站 2024-03-01 20:16:46
...

本篇使用信号量机制实现对全局资源的正确使用,包括以下两点:

  • 各个子线程对全局资源的互斥使用
  • 主线程对子线程的同步

信号量

简单的说,信号量内核对象,也是多线程同步的一种机制,它可以对资源访问进行计数,包括最大资源计数和当前资源计数,是两个32位的值;另外,计数是以原子访问的方式进行,由操作系统维护;

  • 最大资源计数,表示可以控件的最大资源数量
  • 当前资源计数,表示当前可用资源的数量

信号量的规则:

  • 如果当前资源计数器大于0,那么信号量处于触发状态
  • 如果当前资源计数器等于0,那么信号量处于未触发状态
  • 系统绝对不会让当前资源计数器变为负数
  • 当前资源计数器决定不会大于最大资源计数

信号量机制:

以一个停车场的运作为例。假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆直接进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入外面的一辆进去,如果又离开两辆,则又可以放入两辆,如此往复。

在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用,当当前资源计数大于0,信号量处于触发状态,线程编程可调度;

相关API

  • 创建信号量
HANDLE WINAPI CreateSemaphore(
    LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,//more设置为NULL
    LONG lInitialCount,   //当前可用资源的数量
    LONG lMaximumCount,   //最大资源数量
    LPCTSTR lpName        //信号量名字,可以设置为NULL
);
  • 释放信号量
BOOL WINAPI ReleaseSemaphore(
    HANDLE hSemaphore,      //信号量句柄
    LONG lReleaseCount,     //该函数返回时,当前可用资源的数增加lReleaseCount个
    LPLONG lpPreviousCount  
);
  • 打开已经存在的信号量
HANDLE WINAPI OpenSemaphore(
    DWORD dwDesiredAccess,
    BOOL bInheritHandle,
    LPCTSTR lpName //打开信号量的名字
);
  • wait函数
DWORD WINAPI WaitForSingleObject(
    HANDLE hHandle,      //当等待成功时,当前可用信号量递减
    DWORD dwMilliseconds //等待时间
);

线程同步举例

线程同步包含两层含义:

  1. 对全局资源互斥访问
  2. 线程间的有序、协同执行(比如:线程A执行完后,再进行线程B的操作)

例子1 互斥访问

#include "stdafx.h"
#include <iostream>
#include <afxmt.h>
using namespace std;

int g_nIndex = 0;
const int nMaxCnt = 20;

#define MAX_SEM_COUNT 1
#define THREADCOUNT 12

HANDLE ghSemaphore;

DWORD WINAPI ThreadProcBySEM( LPVOID );

int _tmain(int argc, _TCHAR* argv[])
{
    HANDLE aThread[THREADCOUNT];
    int i;

    // Create a semaphore with initial and max counts of MAX_SEM_COUNT
    // 备注:实现多个子线程资源互斥范围,最大信号量需要设置为1
    ghSemaphore = CreateSemaphore( 
        NULL,   // default security attributes
        1,      // initial count
        1,      // maximum count
        NULL);  // unnamed semaphore
    if (ghSemaphore == NULL) 
    {
        printf("CreateSemaphore error: %d\n", GetLastError());
        return 0;
    }

    // Create worker threads
    for( i=0; i < THREADCOUNT; i++ )
    {
        aThread[i] = CreateThread( 
            NULL,       // default security attributes
            0,          // default stack size
            (LPTHREAD_START_ROUTINE) ThreadProcBySEM, 
            NULL,       // no thread function arguments
            0,          // default creation flags
            NULL); // receive thread identifier

        if( aThread[i] == NULL )
        {
            printf("CreateThread error: %d\n", GetLastError());
            return 0;
        }
    }

    //Wait for all threads to terminate
    WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);

    //Close thread and semaphore handles
    for( i=0; i < THREADCOUNT; i++ )
        CloseHandle(aThread[i]);

    CloseHandle(ghSemaphore);

    return 0;
}


DWORD WINAPI ThreadProcBySEM(LPVOID lpParam)
{
    DWORD dwWaitResult;
    BOOL bContinue = TRUE;
    while(bContinue)
    {
        //Try to enter the semaphore gate.
        dwWaitResult = WaitForSingleObject(ghSemaphore,INFINITE);
        if (WAIT_OBJECT_0 == dwWaitResult)
        {
            if (g_nIndex++ < nMaxCnt)
            {
                //Simulate thread spending time on task
                Sleep(5);
                printf("Thread %d: Index = %d\n", GetCurrentThreadId(), g_nIndex);
            }
            else
            {
                bContinue = FALSE;
            }

            // Relase the semaphore when task is finished
            if (!ReleaseSemaphore( 
                ghSemaphore,  // handle to semaphore
                1,            // increase count by one
                NULL))       // not interested in previous count
            {
                printf("ReleaseSemaphore error: %d\n", GetLastError());
            }
        }
        else
        {
            break;
        }
    }

    return TRUE;
}

运行结果:

利用信号量实现线程同步

例子2 线程同步

下面我们将线程创建时的顺序编号,也打印出来,实现方式是将变量i传递给线程,以实现打印;

错误代码:

我们将编号i的地址作为CreateThread入参,传递给线程函数ThreadProcBySEM;主要修改如下:

 // Create worker threads
 for( i=0; i < THREADCOUNT; i++ )
 {
     aThread[i] = CreateThread( 
         NULL,       // default security attributes
         0,          // default stack size
         (LPTHREAD_START_ROUTINE) ThreadProcBySEM, 
         &i,         //线程编号地址
         0,          // default creation flags
         NULL);      // receive thread identifier

     if( aThread[i] == NULL )
     {
         printf("CreateThread error: %d\n", GetLastError());
         return 0;
     }
 }
 //线程函数代码:
DWORD WINAPI ThreadProcBySEM(LPVOID lpParam)
{
    DWORD dwWaitResult;
    BOOL bContinue = TRUE;
    if (NULL == lpParam)
    {
        return FALSE;
    }
    //获取编号,线程创建有时间开销,当函数执行到,赋值语句时
    //lpParam所指向的是变量i,但是主线程已经对i进行++,不再是创建时的值了
    int nThreadNum = *(int*)lpParam;

    if (WAIT_OBJECT_0 == dwWaitResult)
    {
        if (g_nIndex++ < nMaxCnt)
        {
            //Simulate thread spending time on task
            Sleep(5);
            //打印线程编号
            printf("NO %d = Thread %d: Index = %d\n", nThreadNum, GetCurrentThreadId(), g_nIndex);
        }
    } 
}

运行结果:

从下图中可以看出,不同的线程句柄,却有相同的线程编号,属于异常线程;

利用信号量实现线程同步

原因分析:

线程创建到线程函数执行存在时间开销,线程函数不会第一时间开始执行,而此时主线程还处于非阻塞状态,主线程仍然可以对变量i不断进行++操作,导致当前线程进行访问时,其内容已经不再是当前那个值了;

正确做法:

1.引入关键代码段同步对象,用于各个子线间,访问全局资源g_nIndex,
2.在创建新线程前,需要将当前线程的编号保存到,临时变量;
3.将信号量对象用于主线程和子线程间的同步;

全部代码如下:

#include "stdafx.h"
//#include <windows.h>
#include <iostream>
#include <afxmt.h>
using namespace std;

#define MAX_SEM_COUNT 1
#define THREADCOUNT 12

int g_nIndex = 0;
const int nMaxCnt = 20;

HANDLE ghSemaphore;
CRITICAL_SECTION g_csLockA;

DWORD WINAPI ThreadProcBySEM( LPVOID );

int _tmain(int argc, _TCHAR* argv[])
{
    HANDLE aThread[THREADCOUNT];
    int i;

    // Create a semaphore with initial and max counts of MAX_SEM_COUNT
    ghSemaphore = CreateSemaphore( 
        NULL,   // default security attributes
        0,      // initial count,modify:当前处于非触发状态
        1,      // maximum count
        NULL);  // unnamed semaphore
    if (ghSemaphore == NULL) 
    {
        printf("CreateSemaphore error: %d\n", GetLastError());
        return 0;
    }

    //初始化关键代码段对象,用于访问全局资源的互斥
    InitializeCriticalSection(&g_csLockA);

    // Create worker threads
    for( i=0; i < THREADCOUNT; i++ )
    {

        aThread[i] = CreateThread( 
            NULL,       // default security attributes
            0,          // default stack size
            (LPTHREAD_START_ROUTINE) ThreadProcBySEM, 
            &i,       // no thread function arguments
            0,          // default creation flags
            NULL); // receive thread identifier

        if( aThread[i] == NULL )
        {
            printf("CreateThread error: %d\n", GetLastError());
            return 0;
        }
        //modify 创建新线程前,需要将编号保存起来,才创建新线程
        //用于同步主线程和子线程
        WaitForSingleObject(ghSemaphore,INFINITE);
    }

    //Wait for all threads to terminate
    WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);

    //Close thread and semaphore handles
    for( i=0; i < THREADCOUNT; i++ )
        CloseHandle(aThread[i]);

    CloseHandle(ghSemaphore);

    DeleteCriticalSection(&g_csLockA);

    return 0;
}


DWORD WINAPI ThreadProcBySEM(LPVOID lpParam)
{
    DWORD dwWaitResult;
    BOOL bContinue = TRUE;
    if (NULL == lpParam)
    {
        return FALSE;
    }
    int nThreadNum = *(int*)lpParam;

    // Relase the semaphore 
    if (!ReleaseSemaphore( 
        ghSemaphore,  // handle to semaphore
        1,            // increase count by one
        NULL))       // not interested in previous count
    {
        printf("ReleaseSemaphore error: %d\n", GetLastError());
    }

    while(bContinue)
    {
        Sleep(10);//modify:当前线程处于休眠,给其他线程执行机会

        //Try to enter the cs gate. //modify:保护资源的唯一性访问
        EnterCriticalSection(&g_csLockA);
        if (g_nIndex++ < nMaxCnt)
        {
            printf("NO %02d = Thread %d: Index = %d\n", nThreadNum, GetCurrentThreadId(), g_nIndex);
        }
        else
        {
            bContinue = FALSE;
        }

        LeaveCriticalSection(&g_csLockA);
    }

    return TRUE;
}

运行结果:

利用信号量实现线程同步

从运行结果看出,线程句柄和编号可以一一对应,并且资源访问也正常;