欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

UNP卷一chapter26 线程

程序员文章站 2024-03-04 10:12:11
...

1、并发服务器中,用线程代替fork

fork的成本昂贵,fork返回后父子进程间信息的传递需要进程间通信(IPC)机制。但线程有助于解决这两个问题。故线程有时称为轻权进程,因为线程比进程“权重轻些”。

同一进程内的所有线程共享相同的全局内存。这使得线程之间易于共享信息,然而伴随而来的却是同步问题。

同一进程内的所有线程除了共享全局变量外还共享:进程指令、大多数数据、打开的文件、信号处理函数和信号处置、当前工作目录、用户ID和组ID。每个线程有各自的线程ID、寄存器集合,包括程序计数器和栈指针、栈(用于存放局部变量和返回地址)、errno、信号掩码、优先级等。

2、基本线程函数

i、pthread_create函数

当一个程序由exec启动时,称为初始线程或主线程的单个线程就创建了,其余线程则由pthread_create函数创建。

#include<pthread.h>
int pthread_create(pthread_t *tid, const pthread_attr_t *attr,
	void *(*func)(void*), void *arg);//返回:若成功则为0,若出错则为正的Exxx值

新的线程成功创建,其线程ID通过tid指针返回,attr指定线程属性或采纳默认设置(NULL),func为线程执行的函数,arg作为唯一调用参数。

ii、pthread_join函数(通过调用pthread_joint等待一个给定线程终止,pthread_create类似于fork,pthread_join类似于waitpid)

#include<pthread.h>
int pthread_join(pthread_t *tid, void **status);////返回:若成功则为0,若出错则为正的Exxx值

此函数没办法等待任意一个线程。如果status指针非空,来自所等待线程的返回值(一个指向某个对象的指针)将存入由status指向的位置。

iii、pthread_self函数

每个线程使用pthread_self获取自身的线程ID。

#include<pthread.h>
pthread_t pthread_self(void);//调用线程的线程id

iv、pthread_detach函数

当一个可汇合的线程终止时,它的线程ID和退出状态将留存到另一个线程对它调用pthread_join。脱离的线程却像守护进程,当它们终止时,所有相关资源都被释放。pthread_detach函数把指定的线程转变为脱离状态。

#include<pthread.h>
int pthread_detach(pthread_t tid);//返回:若成功则为0,若出错则为正的Exxx值

v、pthread_exit函数(线程终止的方法之一调用pthread_exit)

#include<pthread.h>
void pthread_exit(void *status);//不返回到调用者

指针status不能指向局部于调用线程的对象,因为线程终止时这样的对象也消失。

如果本线程未曾脱离,它的线程ID和退出状态将一直留存到调用进程内的某个其他线程对它调用pthread_join。

3、使用线程的str_cli函数及tcp回射服务器程序(存在着多个线程不同步访问一个共享变量问题,这是线程使用中必须要注意的问题)

UNP卷一chapter26 线程

i、使用线程的str_cli函数

#include	"unpthread.h"

void	*copyto(void *);

static int	sockfd;		/* global for both threads to access */
static FILE	*fp;//避免了创建一个需要传入的结构体参数,于是可令pthread_create最后一个参数为NULL

void
str_cli(FILE *fp_arg, int sockfd_arg)
{
	char		recvline[MAXLINE];
	pthread_t	tid;

	sockfd = sockfd_arg;	/* copy arguments to externals */
	fp = fp_arg;

	Pthread_create(&tid, NULL, copyto, NULL);//构造线程,采用默认设置参数,其执行函数为copyto,传入参数为NULL

	while (Readline(sockfd, recvline, MAXLINE) > 0)
		Fputs(recvline, stdout);
}

void *
copyto(void *arg)
{
	char	sendline[MAXLINE];

	while (Fgets(sendline, MAXLINE, fp) != NULL)
		Writen(sockfd, sendline, strlen(sendline));

	Shutdown(sockfd, SHUT_WR);	/* EOF on stdin, send FIN */

	return(NULL);
	/* 4return (i.e., thread terminates) when EOF on stdin */
}
ii、使用线程且参数传递更具移植性的tcp回射服务器程序
#include	"unpthread.h"

static void	*doit(void *);		/* each thread executes this function */

int
main(int argc, char **argv)
{
	int				listenfd, *iptr;
	thread_t		tid;
	socklen_t		addrlen, len;
	struct sockaddr	*cliaddr;

	if (argc == 2)
		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
	else if (argc == 3)
		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
	else
		err_quit("usage: tcpserv01 [ <host> ] <service or port>");

	cliaddr = Malloc(addrlen);

	for ( ; ; ) {
		len = addrlen;
		iptr = Malloc(sizeof(int));//分配一块内存
		*iptr = Accept(listenfd, cliaddr, &len);//内存上写入相应套接字描述符
		Pthread_create(&tid, NULL, &doit, iptr);//指定线程执行函数,并传输指向相应套接字描述符指针
	}
}

static void *
doit(void *arg)
{
	int		connfd;

	connfd = *((int *) arg);//在线程函数中,用另一局部变量存储相应套接字描述符
	free(arg);//释放主线程中释放的动态内存

	Pthread_detach(pthread_self());
	str_echo(connfd);		/* same function as before */
	Close(connfd);			/* done with connected socket */
	return(NULL);
}

4、线程特定数据

存在的问题:把一个未线程化的程序转换成使用线程的版本时,有时会碰到因其中有函数使用静态变量而引起的一个常见编程错误(线程安全)。此处利用使用线程特定数据的方法使现有函数达成线程安全。

系统(可能是线程函数库)为每个进程维护一个Key结构的结构数组,如下图如示,

UNP卷一chapter26 线程

除进程范围的Key结构数组外,系统还在进程内维护关于每个线程的多条信息,即pthread结构,如下图所示,

UNP卷一chapter26 线程

处理线程特定数据时通常需要调用的如下函数,

#include<pthread.h>
int pthread_once(pthread_once_t *onceptr, void(*init)(void));//确保pthread_key_create只被第一个调用函数的线程所调用

int pthread_key_create(pthread_key_t *keyptr, void(*destructor)(void* value));//所创建的键通过keyptr指针参数返回
                     //均返回:若成功则为0,若出错则为正的Exxx值

//以下两函数分别用于获取和存放与某个键关联的值,该值一般为指针,且指向一个动态分配的内存区
void *pthread_getspecific(pthread_key_t key);
                     //返回:指向线程特定数据的指针(有可能是一个空指针)
int pthread_setspecific(pthread_key_t key, const void* value);
                     //返回:若成功则为0,若出错则为正的Exxx值

使用线程特定数据的readline函数,其代码如下,

#include	"unpthread.h"

static pthread_key_t	rl_key;
static pthread_once_t	rl_once = PTHREAD_ONCE_INIT;

static void
readline_destructor(void *ptr)
{
	free(ptr);
}

static void
readline_once(void)
{
	Pthread_key_create(&rl_key, readline_destructor);
}

typedef struct {
	int	 rl_cnt;			/* initialize to 0 */
	char	*rl_bufptr;			/* initialize to rl_buf */
	char	 rl_buf[MAXLINE];
} Rline;


static ssize_t
my_read(Rline *tsd, int fd, char *ptr)
{
	if (tsd->rl_cnt <= 0) {
	again:
		if ((tsd->rl_cnt = read(fd, tsd->rl_buf, MAXLINE)) < 0) {
			if (errno == EINTR)
				goto again;
			return(-1);
		}
		else if (tsd->rl_cnt == 0)
			return(0);
		tsd->rl_bufptr = tsd->rl_buf;
	}

	tsd->rl_cnt--;
	*ptr = *tsd->rl_bufptr++;
	return(1);
}

ssize_t
readline(int fd, void *vptr, size_t maxlen)
{
	size_t		n, rc;
	char	c, *ptr;
	Rline	*tsd;

	Pthread_once(&rl_once, readline_once);//使本进程内第一个调用readline线程通过调用pthread_once创建线程特定数据键
	if ((tsd = pthread_getspecific(rl_key)) == NULL) {//返回指向线程特定数据的指针,返回值是一个空指针
		tsd = Calloc(1, sizeof(Rline));		/* init to 0 */
		Pthread_setspecific(rl_key, tsd);//相当于设置键(rl_key)-值(tsd)的关系
	}

	ptr = vptr;
	for (n = 1; n < maxlen; n++) {
		if ((rc = my_read(tsd, fd, &c)) == 1) {
			*ptr++ = c;
			if (c == '\n')
				break;
		}
		else if (rc == 0) {
			*ptr = 0;
			return(n - 1);		/* EOF, n - 1 bytes read */
		}
		else
			return(-1);		/* error, errno set by read() */
	}

	*ptr = 0;
	return(n);
}

5、互斥锁、条件变量

i、互斥锁

多个线程更改一个共享变量的问题,其解决办法之一是使用一个互斥锁保护这个共享变量。访问该变量的前提条件是持有该互斥锁。以下两个分别是互斥锁上锁与解锁函数,

#include<pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mptr);
int pthread_mutex_unlock(pthread_mutex_t* mptr);
                                //均返回:若成功则为0,若出错则为正的Exxx值

如果试图上锁已被另外某个线程锁住的一个互斥锁,本线程将被阻塞,直至该互斥锁被解锁为止。

如果某个互斥锁变量是静态分配的,就必须把它们初始化为常值PTHREAD_MUTEX_INITIALIZER。

使用互斥锁保护共享变量相应举例,

#include	"unpthread.h"

#define	NLOOP 5000

int				counter;		/* incremented by threads */
pthread_mutex_t	counter_mutex = PTHREAD_MUTEX_INITIALIZER;//共享内存区中分配互斥锁,需要在运行时将其初始化

void	*doit(void *);

int
main(int argc, char **argv)
{
	pthread_t	tidA, tidB;

	Pthread_create(&tidA, NULL, &doit, NULL);
	Pthread_create(&tidB, NULL, &doit, NULL);

		/* 4wait for both threads to terminate */
	Pthread_join(tidA, NULL);
	Pthread_join(tidB, NULL);

	exit(0);
}

void *
doit(void *vptr)
{
	int		i, val;

	/*
	 * Each thread fetches, prints, and increments the counter NLOOP times.
	 * The value of the counter should increase monotonically.
	 */

	for (i = 0; i < NLOOP; i++) {
		Pthread_mutex_lock(&counter_mutex);//上锁

		val = counter;
		printf("%d: %d\n", pthread_self(), val + 1);
		counter = val + 1;

		Pthread_mutex_unlock(&counter_mutex);//解锁
	}

	return(NULL);
}

ii、条件变量

需要一个让主循环进入睡眠,直到某个线程通知它有事可做才醒来的方法。首先条件变量结合互斥锁的方法,互斥锁提供互斥机制,条件变量提供信号机制。条件变量是类型为pthread_cond_t的变量。以下两个函数使用条件变量,

#include<pthread.h>
int pthread_cond_wait(pthread_cond_t* cptr, pthread_mutex_t* mptr);//将调用线程投入睡眠并释放调用线程持有的互斥锁,返回时,该线程再次持有该互斥锁
int pthread_cond_signal(pthread_cond_t* cptr);//通过给予条件信号,唤醒在睡眠中的线程
                                //均返回:若成功则为0,若出错则为正的Exxx值

int pthread_cond_broadcast(pthread_cond_t* cptr);//广播唤醒多个相应变量的睡眠中的线程
int pthread_cond_timedwait(pthread_cond_t* cptr, pthread_mutex_t* mptr,//允许线程设置一个阻塞时间的限制
	const struct timespec* abstime);
                                //均返回:若成功则为0,若出错则为正的Exxx值
举例说明,给计数器ndone同时关联一个条件变量和一个互斥锁,
int ndone;
pthread_mutex_t ndone_mutex = PTHREAD_MUTEX_INITIALIZER;
phtread_cond_t ndone_cond = PTHREAD_COND_INITIALIZER;
//通过在持在该互斥锁期间递增该计数器并发送信号到该条件变量,一个线程通知主循环自身即将终止
pthread_mutex_lock(&ndone_mutex);//上锁
ndone++;
pthread_cond_signal(&ndone_cond);//设置条件变量,即给予触发睡眠线程的信号
pthread_mutex_unlock(&ndone_mutex);//解锁
//主循环阻塞在pthread_cond_wait调用中,等待某个即将终止的线程发送信号到与ndone关联的条件变量
while (nlefttoread > 0) {
	while (nconn < maxnconnn&&nlefttoconn>0) {
		/*find a file to read*/
	}

	/*wait for one of the threads to terminate*/
	pthread_mutex_lock(&ndone_mutex);//上锁
	while (ndone == 0)
		pthread_cond_wait(&ndone_cond, &ndone_mutex);//将调用线程投入睡眠并释放调用线程持有的互斥锁,
	                                                 //返回时,该线程再次持有该互斥锁。(原子操作)
	                                                 //进入睡眠的线程,需要其它线程给予信号量唤醒。

	for (i = 0; i < nfiles; i++) {
		if (file[i].f_flags&F_DONE) {
			pthread_join(file[i].f_tid, (void**)&fptr);
			/*update file[i] for terminated thread*/
			...
		}
	}
	pthread_mutex_unlock(&ndone_mutex);
}

以上知识点来均来自steven先生所著UNP卷一(version3),刚开始学习网络编程,如有不正确之处请大家多多指正。