网络编程 26(下)阻塞 I/O 线程池模型
程序员文章站
2024-02-29 22:26:28
...
网络编程 26(下)阻塞 I/O 线程池模型
目标
使用线程池的方式,在服务端启动的同时,预先创建一定数量的线程,等待并处理连接
一、线程的两个概念
- 锁(mutex):当前线程操作时,其它线程不能进入,保证线程安全
- 条件变量(condition):多个线程需要交互的情况下,用来线程间同步的原语
二、阻塞 I/O 线程池模型
服务端
thread_pool_server.c
#include "common.h"
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) {
return c + 13;
} else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) {
return c - 13;
}
return c;
}
void loop_echo(int fd) {
char outbuf[512];
int i;
ssize_t result;
while (1) {
result = recv(fd, &outbuf, sizeof(outbuf), 0);
if (result == 0) {
break;
} else if (result == -1) {
error(1, errno, "recv failed");
break;
}
for (i = 0; i < result; i++) {
outbuf[i] = rot13_char(outbuf[i]);
if (outbuf[i] == '\n') {
send(fd, outbuf, result, 0);
break;
}
}
}
}
typedef struct {
int size; // 队列长度
int *fd;
int front; // 队头
int rear; // 队尾
pthread_mutex_t mutex;
int count; // 队列中实际元素个数
pthread_cond_t noEmpty; // 队列非空条件
pthread_cond_t noFull; // 队列未满条件
} block_queue;
void block_queue_init(block_queue *blockQueue, int size) {
blockQueue->size = size;
blockQueue->fd = calloc(size, sizeof(int));
blockQueue->front = blockQueue->rear = blockQueue->count = 0;
pthread_mutex_init(&blockQueue->mutex, NULL);
pthread_cond_init(&blockQueue->noEmpty, NULL);
pthread_cond_init(&blockQueue->noFull, NULL);
}
// 往描述符队列队尾放置一个描述符 fd
void block_queue_push(block_queue *blockQueue, int fd) {
// 加锁
pthread_mutex_lock(&blockQueue->mutex);
// 描述符队列满了
while (blockQueue->count == blockQueue->size) {
// 等待不满,然后再继续往下执行
pthread_cond_wait(&blockQueue->noFull, &blockQueue->mutex);
}
// 将描述符 fd 放到队尾,队尾下标 +1
blockQueue->fd[blockQueue->rear++] = fd;
// 如果队尾到了队列最大位置
if (blockQueue->rear == blockQueue->size) {
// 重置队尾位置???
blockQueue->rear = 0;
}
blockQueue->count++;
// 上面放了一个描述符 fd,此时队列就不空,则通知其它等待线程,有新的连接套接字描述符等待处理
pthread_cond_signal(&blockQueue->noEmpty);
// 解锁
pthread_mutex_unlock(&blockQueue->mutex);
}
// 获取描述符队列队头描述符 fd 进行处理
int block_queue_pop(block_queue *blockQueue) {
// 加锁
pthread_mutex_lock(&blockQueue->mutex);
// 描述符队头和队尾下标相等,说明队列空了,没有套接字描述符可以用
while (blockQueue->front == blockQueue->rear) {
// 等待非空,然后再继续往下执行
pthread_cond_wait(&blockQueue->noEmpty, &blockQueue->mutex);
}
// 获取队头的描述符 fd,然后队头下标 +1
int fd = blockQueue->fd[blockQueue->front++];
// 如果队头到了队列最大位置
if (blockQueue->front == blockQueue->size) {
// 重置队头???
blockQueue->front = 0;
}
blockQueue->count--;
// 上面取了一个描述符 fd,此时队列就不满了,则通知其它等待线程,如果再有新的描述符 fd 连接的话,可以存起来了
pthread_cond_signal(&blockQueue->noFull);
// 解锁
pthread_mutex_unlock(&blockQueue->mutex);
// 返回连接套接字描述符 fd
return fd;
}
// 线程入口函数
void *thread_run(void *arg) {
pthread_t tid = pthread_self();
// 设置线程成分离的
pthread_detach(tid);
block_queue *blockQueue = (block_queue *)arg;
while (1) {
int fd = block_queue_pop(blockQueue);
loop_echo(fd);
}
}
typedef struct {
pthread_t thread_tid;
} Thread;
int main(int argc, char **argv) {
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);
// 端口复用
int on = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
socklen_t servlen = sizeof(servaddr);
int bind_rt = bind(listenfd, (struct sockaddr *)&servaddr, servlen);
if (bind_rt < 0) {
error(1, errno, "bind failed");
}
int listen_rt = listen(listenfd, LISTENQ);
if (listen_rt < 0) {
error(1, errno, "listen failed");
}
block_queue blockQueue;
block_queue_init(&blockQueue, BLOCK_QUEUE_SIZE);
Thread *thread_array = calloc(THREAD_NUMBER, sizeof(Thread));
int i;
for (i = 0; i < THREAD_NUMBER; i++) {
pthread_create(&(thread_array[i].thread_tid), NULL, thread_run, (void *)&blockQueue);
}
while (1) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listenfd, (struct sockaddr *)&ss, &slen);
if (fd < 0) {
error(1, errno, "accept failed");
} else {
// 往描述符队列队尾放置一个描述符 fd(当描述符队列非空时,会通知等待线程对描述符进行处理)
block_queue_push(&blockQueue, fd);
}
}
}
头文件 common.h
#ifndef CHAP_26_COMMON_H
#define CHAP_26_COMMON_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <sys/socket.h> /* basic socket definitions */
#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
#include <arpa/inet.h> /* inet(3) functions */
#include <errno.h>
#include <unistd.h>
#include <pthread.h>
void error(int status, int err, char *fmt, ...);
#define SERV_PORT 43211
#define LISTENQ 1024
#define BLOCK_QUEUE_SIZE 100
#define THREAD_NUMBER 4
#endif //CHAP_26_COMMON_H
三、CMake 管理当前项目
① 代码组成
-CMakeLists.txt
-include:存放头文件
-src:存放源代码
CMakeLists.txt
CMAKE_MINIMUM_REQUIRED(VERSION 3.1)
SET(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/bin)
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include)
ADD_SUBDIRECTORY(src)
include 目录:include/common.h(common.h 上面有)
src 目录(thread_pool_server.c 上面有,thread_server.c 是上一篇的内容)
src/CmakeLists.txt
ADD_EXECUTABLE(thread_server thread_server.c)
TARGET_LINK_LIBRARIES(thread_server pthread)
ADD_EXECUTABLE(thread_pool_server thread_pool_server.c)
TARGET_LINK_LIBRARIES(thread_pool_server pthread)
② 创建并进入 build 目录
mkdir build && cd build
③ 外部编译
cmake .. && make
四、测试
可以使用一个或多个 telnet 客户端连接服务器,检验交互是否正常
测试步骤
① 打开三个命令行窗口
② 其中一个窗口先执行服务器命令,输入命令 ./thread_server
后回车
③ 其余窗口执行客户端命令,输入命令 ./telnet-client 127.0.0.1 43211
后回车
左:服务端;右上、右下:客户端
上面的阻塞 I/O 线程池模型的服务端程序,可以并发处理多个不同的客户端连接,互不干扰
总结
问题:虽然线程是轻量级的,但如果并发连接过多,线程的频繁创建和销毁会有一定的开销(虽然上下文切换开销不大)
解:使用线程池的方式,在服务端启动的同时,预先创建一定数量的线程,避免频繁进行线程创建和销毁
下一篇: iOS Block的内存管理