欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

互斥量和信号量实现线程池

程序员文章站 2022-06-04 22:59:05
...


前言

在传统服务器结构中,常有一个总的监听线程监听新用户连接服务器,每当有一个新的用户进入,服务器就开启一个新的线程用户处理这个用户的数据包,这个线程只服务于这个用户,当用户与服务器端关闭连接以后,服务器端销毁这个线程

然而频繁地开辟与销毁线程极大地占用了系统的资源,线程池基本思想就是在程序开始时就在内存中开辟一些线程,通过对多个任务重用已经存在的线程对象,降低了对线程对象创建和销毁的开销,服务器只需要将数据包交给线程池就可以了

当有新的客户请求到达时,不是新创建一个线程,而是从“池子”中选择一个空闲的线程为新的客户请求服务,服务完毕后线程重新进入空闲线程池中。如果没有线程空闲的话,就将数据包暂时积累, 等待线程池内有线程空闲以后再进行处理

一般一个简单线程池包括:

  1. 线程组:创建好的多个线程
  2. 任务队列:存放未处理的任务,提供一种缓冲机制
  3. 互斥量:用于添加任务和执行任务时互斥操作任务队列
  4. 信号量:表示任务队列中是否有任务

一、线程池模型代码

代码实现如下:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>

#define     THREAD_NUM   5             
typedef     void* (*Func)(void *arg);  

struct job {    
    Func func;  // 任务处理函数
    void *arg;  // 任务处理数据
    struct job *next;  // 队列中下一个任务
};

pthread_t threads[THREAD_NUM];  // 线程池中的线程
struct job* head = NULL;        // 任务队列,新任务插入到队列末尾,执行任务则取队列头节点
pthread_mutex_t mutex;          // 互斥量,用于添加任务和执行任务时互斥操作任务队列
sem_t sem;                      // 信号量,表示任务队列中是否有任务

void* threadFunc(void* arg)     // 线程池中线程函数
{
    Func jobFunc;
    void *jobArg;
    struct job *node;

    while (1)
    {        
        sem_wait(&sem);     // 等待有新任务插入到队列
        pthread_mutex_lock(&mutex);
        node = head;
        head = head->next;  // 读取头结点任务执行
        pthread_mutex_unlock(&mutex);

        jobFunc = node->func;
        jobArg = node->arg;
        jobFunc(jobArg);    // 执行任务
        free(node);
    }
}

void addJob(Func func, void *arg)
{
    struct job* jobNode = (struct job*)malloc(sizeof(struct job));
    jobNode->func = func;
    jobNode->arg = arg;
    jobNode->next = NULL;

    pthread_mutex_lock(&mutex);
    if (NULL == head)           
    {
        head = jobNode;
    }
    else
    {
        struct job *node = head;
        while (node->next)          // 新任务插入到队列末尾 
        {
            node = node->next;
        } 
        node->next = jobNode; 
    }
    pthread_mutex_unlock(&mutex);
    sem_post(&sem);                 // 通知线程池中线程有任务可执行
}

void init()
{
    pthread_mutex_init(&mutex, NULL);
    sem_init(&sem, 0, 0);           // 初始队列中没有任务

    for (int i=0; i<THREAD_NUM; i++)
    {
        pthread_create(&threads[i], NULL, threadFunc, NULL);
    }
    
    /* 
        addJob(xxx, xxx);
        ...
    */
}