欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

网络编程 26(下)阻塞 I/O 线程池模型

程序员文章站 2024-02-29 22:26:28
...

目标

使用线程池的方式,在服务端启动的同时,预先创建一定数量的线程,等待并处理连接

一、线程的两个概念

  • 锁(mutex):当前线程操作时,其它线程不能进入,保证线程安全
  • 条件变量(condition):多个线程需要交互的情况下,用来线程间同步的原语

二、阻塞 I/O 线程池模型

服务端

thread_pool_server.c

#include "common.h"

char rot13_char(char c) {
	if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) {
		return c + 13;
	} else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) {
		return c - 13;
	}
        return c;
}

void loop_echo(int fd) {
	char outbuf[512];
	int i;
	ssize_t result;

	while (1) {
		result = recv(fd, &outbuf, sizeof(outbuf), 0);

		if (result == 0) {
			break;
		} else if (result == -1) {
			error(1, errno, "recv failed");
			break;
		}

		for (i = 0; i < result; i++) {
			outbuf[i] = rot13_char(outbuf[i]);

			if (outbuf[i] == '\n') {
				send(fd, outbuf, result, 0);
				break;
			}
		}
	}
}

typedef struct {
	int size; // 队列长度
	int *fd;
	int front; // 队头
	int rear; // 队尾
	pthread_mutex_t mutex;
	int count; // 队列中实际元素个数
	pthread_cond_t noEmpty; // 队列非空条件
	pthread_cond_t noFull; // 队列未满条件
} block_queue;

void block_queue_init(block_queue *blockQueue, int size) {
	blockQueue->size = size;
	blockQueue->fd = calloc(size, sizeof(int));
	blockQueue->front = blockQueue->rear = blockQueue->count = 0;
	pthread_mutex_init(&blockQueue->mutex, NULL);
	pthread_cond_init(&blockQueue->noEmpty, NULL);
	pthread_cond_init(&blockQueue->noFull, NULL);
}

// 往描述符队列队尾放置一个描述符 fd
void block_queue_push(block_queue *blockQueue, int fd) {
	// 加锁
	pthread_mutex_lock(&blockQueue->mutex);
	// 描述符队列满了
	while (blockQueue->count == blockQueue->size) {
		// 等待不满,然后再继续往下执行
		pthread_cond_wait(&blockQueue->noFull, &blockQueue->mutex);
	}
	// 将描述符 fd 放到队尾,队尾下标 +1
	blockQueue->fd[blockQueue->rear++] = fd;
	// 如果队尾到了队列最大位置
	if (blockQueue->rear == blockQueue->size) {
		// 重置队尾位置???
		blockQueue->rear = 0;
	}
	blockQueue->count++;
	// 上面放了一个描述符 fd,此时队列就不空,则通知其它等待线程,有新的连接套接字描述符等待处理
	pthread_cond_signal(&blockQueue->noEmpty);
	// 解锁
	pthread_mutex_unlock(&blockQueue->mutex);
}

// 获取描述符队列队头描述符 fd 进行处理
int block_queue_pop(block_queue *blockQueue) {
	// 加锁
	pthread_mutex_lock(&blockQueue->mutex);
	// 描述符队头和队尾下标相等,说明队列空了,没有套接字描述符可以用
	while (blockQueue->front == blockQueue->rear) {
		// 等待非空,然后再继续往下执行
		pthread_cond_wait(&blockQueue->noEmpty, &blockQueue->mutex);
	}
	// 获取队头的描述符 fd,然后队头下标 +1
	int fd = blockQueue->fd[blockQueue->front++];
	// 如果队头到了队列最大位置
	if (blockQueue->front == blockQueue->size) {
		// 重置队头???
		blockQueue->front = 0;
	}
	blockQueue->count--;
	// 上面取了一个描述符 fd,此时队列就不满了,则通知其它等待线程,如果再有新的描述符 fd 连接的话,可以存起来了
	pthread_cond_signal(&blockQueue->noFull);
	// 解锁
	pthread_mutex_unlock(&blockQueue->mutex);
	// 返回连接套接字描述符 fd
	return fd;
}

// 线程入口函数
void *thread_run(void *arg) {
	pthread_t tid = pthread_self();
	// 设置线程成分离的
	pthread_detach(tid);

	block_queue *blockQueue = (block_queue *)arg;
	while (1) {
		int fd = block_queue_pop(blockQueue);
		loop_echo(fd);
	}
}

typedef struct {
    pthread_t thread_tid;
} Thread;

int main(int argc, char **argv) {
	int listenfd = socket(AF_INET, SOCK_STREAM, 0);

	struct sockaddr_in servaddr;
	bzero(&servaddr, sizeof(servaddr));
	servaddr.sin_family = AF_INET;
	servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
	servaddr.sin_port = htons(SERV_PORT);

	// 端口复用
	int on = 1;
	setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

	socklen_t servlen = sizeof(servaddr);
	int bind_rt = bind(listenfd, (struct sockaddr *)&servaddr, servlen);
	if (bind_rt < 0) {
		error(1, errno, "bind failed");
	}

	int listen_rt = listen(listenfd, LISTENQ);
	if (listen_rt < 0) {
		error(1, errno, "listen failed");
	}

	block_queue blockQueue;
	block_queue_init(&blockQueue, BLOCK_QUEUE_SIZE);

	Thread *thread_array = calloc(THREAD_NUMBER, sizeof(Thread));

	int i;
	for (i = 0; i < THREAD_NUMBER; i++) {
		pthread_create(&(thread_array[i].thread_tid), NULL, thread_run, (void *)&blockQueue);
	}

	while (1) {
		struct sockaddr_storage ss;
		socklen_t slen = sizeof(ss);

		int fd = accept(listenfd, (struct sockaddr *)&ss, &slen);
		if (fd < 0) {
			error(1, errno, "accept failed");
		} else {
			// 往描述符队列队尾放置一个描述符 fd(当描述符队列非空时,会通知等待线程对描述符进行处理)
			block_queue_push(&blockQueue, fd);
		}
	}
}

头文件 common.h

#ifndef CHAP_26_COMMON_H
#define CHAP_26_COMMON_H

#include    <stdio.h>
#include    <stdlib.h>
#include    <string.h>
#include    <strings.h>
#include    <sys/socket.h>    /* basic socket definitions */
#include    <netinet/in.h>    /* sockaddr_in{} and other Internet defns */
#include    <arpa/inet.h>    /* inet(3) functions */
#include    <errno.h>

#include    <unistd.h>

#include    <pthread.h>

void error(int status, int err, char *fmt, ...);

#define    SERV_PORT         43211
#define    LISTENQ           1024

#define    BLOCK_QUEUE_SIZE  100
#define    THREAD_NUMBER     4

#endif //CHAP_26_COMMON_H

三、CMake 管理当前项目

① 代码组成

-CMakeLists.txt
-include:存放头文件
-src:存放源代码
网络编程 26(下)阻塞 I/O 线程池模型

CMakeLists.txt

CMAKE_MINIMUM_REQUIRED(VERSION 3.1)
SET(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/bin)
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include)
ADD_SUBDIRECTORY(src)

include 目录:include/common.h(common.h 上面有)网络编程 26(下)阻塞 I/O 线程池模型

src 目录(thread_pool_server.c 上面有,thread_server.c 是上一篇的内容)
网络编程 26(下)阻塞 I/O 线程池模型

src/CmakeLists.txt

ADD_EXECUTABLE(thread_server thread_server.c)
TARGET_LINK_LIBRARIES(thread_server pthread)

ADD_EXECUTABLE(thread_pool_server thread_pool_server.c)
TARGET_LINK_LIBRARIES(thread_pool_server pthread)

② 创建并进入 build 目录

mkdir build && cd build

网络编程 26(下)阻塞 I/O 线程池模型

③ 外部编译

cmake .. && make

网络编程 26(下)阻塞 I/O 线程池模型

四、测试

可以使用一个或多个 telnet 客户端连接服务器,检验交互是否正常

测试步骤
① 打开三个命令行窗口
② 其中一个窗口先执行服务器命令,输入命令 ./thread_server 后回车
③ 其余窗口执行客户端命令,输入命令 ./telnet-client 127.0.0.1 43211 后回车

左:服务端;右上、右下:客户端
网络编程 26(下)阻塞 I/O 线程池模型

上面的阻塞 I/O 线程池模型的服务端程序,可以并发处理多个不同的客户端连接,互不干扰

总结

问题:虽然线程是轻量级的,但如果并发连接过多,线程的频繁创建和销毁会有一定的开销(虽然上下文切换开销不大)
解:使用线程池的方式,在服务端启动的同时,预先创建一定数量的线程,避免频繁进行线程创建和销毁

相关标签: 网络编程