互斥量和信号量实现线程池
程序员文章站
2022-06-04 22:59:05
...
前言
在传统服务器结构中,常有一个总的监听线程监听新用户连接服务器,每当有一个新的用户进入,服务器就开启一个新的线程用户处理这个用户的数据包,这个线程只服务于这个用户,当用户与服务器端关闭连接以后,服务器端销毁这个线程
然而频繁地开辟与销毁线程极大地占用了系统的资源,线程池基本思想就是在程序开始时就在内存中开辟一些线程,通过对多个任务重用已经存在的线程对象,降低了对线程对象创建和销毁的开销,服务器只需要将数据包交给线程池就可以了
当有新的客户请求到达时,不是新创建一个线程,而是从“池子”中选择一个空闲的线程为新的客户请求服务,服务完毕后线程重新进入空闲线程池中。如果没有线程空闲的话,就将数据包暂时积累, 等待线程池内有线程空闲以后再进行处理
一般一个简单线程池包括:
- 线程组:创建好的多个线程
- 任务队列:存放未处理的任务,提供一种缓冲机制
- 互斥量:用于添加任务和执行任务时互斥操作任务队列
- 信号量:表示任务队列中是否有任务
一、线程池模型代码
代码实现如下:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>
#define THREAD_NUM 5
typedef void* (*Func)(void *arg);
struct job {
Func func; // 任务处理函数
void *arg; // 任务处理数据
struct job *next; // 队列中下一个任务
};
pthread_t threads[THREAD_NUM]; // 线程池中的线程
struct job* head = NULL; // 任务队列,新任务插入到队列末尾,执行任务则取队列头节点
pthread_mutex_t mutex; // 互斥量,用于添加任务和执行任务时互斥操作任务队列
sem_t sem; // 信号量,表示任务队列中是否有任务
void* threadFunc(void* arg) // 线程池中线程函数
{
Func jobFunc;
void *jobArg;
struct job *node;
while (1)
{
sem_wait(&sem); // 等待有新任务插入到队列
pthread_mutex_lock(&mutex);
node = head;
head = head->next; // 读取头结点任务执行
pthread_mutex_unlock(&mutex);
jobFunc = node->func;
jobArg = node->arg;
jobFunc(jobArg); // 执行任务
free(node);
}
}
void addJob(Func func, void *arg)
{
struct job* jobNode = (struct job*)malloc(sizeof(struct job));
jobNode->func = func;
jobNode->arg = arg;
jobNode->next = NULL;
pthread_mutex_lock(&mutex);
if (NULL == head)
{
head = jobNode;
}
else
{
struct job *node = head;
while (node->next) // 新任务插入到队列末尾
{
node = node->next;
}
node->next = jobNode;
}
pthread_mutex_unlock(&mutex);
sem_post(&sem); // 通知线程池中线程有任务可执行
}
void init()
{
pthread_mutex_init(&mutex, NULL);
sem_init(&sem, 0, 0); // 初始队列中没有任务
for (int i=0; i<THREAD_NUM; i++)
{
pthread_create(&threads[i], NULL, threadFunc, NULL);
}
/*
addJob(xxx, xxx);
...
*/
}