Linux IO复用--select()和poll()
《Linux高性能服务器编程》阅读笔记:
Linux系统中IO复用的系统调用有selece()、poll()和epoll()。IO复用使得程序可以同时监听多个文件描述符的就绪事件的发生,应用场景如:
(1) 服务端程序同时处理监听socket和连接socket
(2) 服务端要同时处理TCP请求和UDP请求
(3) 服务端要同时监听多个端口或者处理多种服务请求
(4) 客户端要同时处理多个socket
(5) 客户端程序要同时处理用户输入和网络连接
不过要清楚的一点是,IO复用虽然能同时监听多个文件描述符,但它是阻塞监听的,当有多个文件描述符同时就绪时,若不采取额外措施,程序只能按照顺序依次处理其中的每一个就绪事件。这实际上也是串行工作,要实现并发,只能使用多进程或多线程等编程手段。
1. select()系统调用
1.1 select()函数原型
select()系统的用途是,在超时时间内,监听用户感兴趣的文件描述符上的可读可写、异常事件的发生。其函数原型为:
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
(1) nfds指定被监听的文件描述符的总数,设置为监听的所有文件描述符中最大值加1,因为文件描述符是从0开始计数的。
(2) readfds、writefds和exceptfds参数分别指向可读、可写、异常事件对应的文件描述符集合。程序员通过这3个参数向该调用传入自己感兴趣的文件描述符。函数返回时,内核将修改它们来通知应用程序哪些文件描述符已经就绪。原型如下:
#define XFD_SETSIZE 256
#define FD_SETSIZE XFD_SETSIZE
typedef long fd_mask;
#define NBBY 8
#define NFDBITS (sizeof(fd_mask) * NBBY)
#define howmany(x,y) (((x)+((y)-1))/(y))
#if defined(BSD) && BSD < 198911
typedef struct fd_set
{
fd_mask fds_bits[howmany(FD_SETSIZE, NFDBITS)];
} fd_set;
#endif
fd_set结构体仅包含一个long型数组,根据推导可见该数组为
long fds_bits[8];
fds_bit共占据32字节,即256位。该数组的每个元素的每一位(bit)标记一个文件描述符。fd_set能容纳的文件描述符由FD_SETSIZE指定,显然这限制了select()能同时处理的文件描述符的总数。该系统调用还提供了一些列宏方便程序员实现位操作:
void FD_CLR(int fd, fd_set *set); //清零set中的fd位
int FD_ISSET(int fd, fd_set *set); //测试set中的fd位是否被设置
void FD_SET(int fd, fd_set *set); //设置fd中的fd位
void FD_ZERO(fd_set *set); //清零set中所有位
(3) timeout参数设置函数的超时时间。它是一个timeval的普通(非const)指针,内核可以修改此参数以告诉应用程序函数阻塞等待了多久。不过内核返回的该值不能完全信任,比如调用失败时timeout的值是不确定的。timeval结构体定义如下:
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
当tv_sec(秒)和tv_usec(微秒)都设置为0则select()立即返回,当timeout为NULL则select()将会一直阻塞直到某个文件描描述符就绪。
selece()执行成功时返回就绪的文件描述符的总数;若在超时时间内没有任何文件描述符就绪,select()将返回0;失败将返回-1并设置errno。若在select()阻塞等待期间程序收到信号,将立即返回-1并设置errno为EINTR。
1.2 socket文件描述符就绪条件
以Linux网络编程中,多路复用判断socket文件描述符可读的依据是:
(1) socket内核接收缓冲区中字节数>=其低水位标记SO_RCVLOWAT时,此时程序可以无阻塞地读该socket,返回读取到的字节数(>0)
(2) socket通信的对端关闭连接,此时对该socket的读操作将返回0表示对端关闭
(3) 监听socket上有新的连接请求
(4) socket上有未处理的错误,此时可以使用getsockopt()读取和清除该错误(使用SO_ERROR标记)
socket文件描述符可写:
(1) socket内核发送缓冲区的空闲区域大于或等于其低水位标记SO_SNDLOWAT,此时程序可以无阻塞的写该socket,返回写入的字节数(>0)
(2) socket的写操作被关闭(使用shotdown(fd, SHUT_WR))后再对socket写,会触发一个SIGPIPE信号
(3) socket使用非阻塞connect()连接成功或者失败(超时)之后,对于后者将会收到RST报文段,若收到RST报文段后继续往该socket写则会触发SIGPIPE信号
(4) socket上未处理的错误
socket文件描述符异常:
socket上接收到带外数据
1.3 利用select()同时收发普通数据和带外数据、对端关闭
#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <libgen.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/select.h>
#define ERRP(con, ret, ...) do \
{ \
if (con) \
{ \
perror(__VA_ARGS__); \
ret; \
} \
}while(0)
#define BUFSIZE 1024
static const char* ip = "192.168.239.136";
static int port = 9660;
int main(void)
{
int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
ERRP(socket_fd <= 0, return -1, "socket");
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int ret = bind(socket_fd, (struct sockaddr* )&address, sizeof(address));
ERRP(ret < 0, goto ERR1, "connect");
ret = listen(socket_fd, 5);
ERRP(ret < 0, goto ERR1, "listen");
struct sockaddr_in client;
socklen_t client_addrlen = sizeof(client);
printf("Wait guest...\n\n");
int connfd = accept(socket_fd, (struct sockaddr* )&client, &client_addrlen);
ERRP(connfd < 0, goto ERR1, "accept");
printf("connect success: %s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
char buf[BUFSIZE] = {};
fd_set read_fds;
fd_set exception_fds;
FD_ZERO(&read_fds);
FD_ZERO(&exception_fds);
struct timeval timeout;
unsigned int cnt = 0;
while (1)
{
bzero(buf, BUFSIZE);
//select()的参数在每次select()函数的返回会被内核修改,所以这里需要重新设置
FD_SET(connfd, &read_fds); //将connfd加入就绪读监听集合
FD_SET(connfd, &exception_fds); //将connfd加入异常监听集合
timeout.tv_usec = 0; //超时时间为4s
timeout.tv_sec = 4;
ret = select(connfd + 1, &read_fds, NULL, &exception_fds, &timeout);
ERRP(ret < 0, goto ERR2, "select");
if (FD_ISSET(connfd, &read_fds))
{
ret = recv(connfd, buf, sizeof(buf) - 1, 0); //recv返回0表示对端已经关闭
ERRP(ret < 0, goto ERR2, "recv normal data");
if (ret == 0)
{
printf("guest exit...\n");
break;
}
printf("get %d bytes of normal data: %s\n", ret, buf);
}
else if (FD_ISSET(connfd, &exception_fds))
{
ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
ERRP(ret < 0, goto ERR2, "recv OOB data");
if (ret == 0)
{
printf("guest exit...\n");
break;
}
printf("get %d bytes of OOB data: %s\n", ret, buf);
}
else if (ret == 0) //select()返回0表示超时返回
{
printf("time out %d\n", ++cnt);
}
}
ERR2:
close(connfd);
ERR1:
close(socket_fd);
return 0;
}
2. poll()系统调用
2.1 poll()函数原型
poll()系统调用与select()类似,也是在指定的时间内轮询一定数量的文件描述符,监听其是否就绪。
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
(1) fds参数是一个pollfd结构体类型的指针,它指定所有程序员感兴趣的文件描述符上发生的可读、可写、异常等事件。既然它是一个结构体指针,就可以指向该类型的数组。
pollfd结构体的原型为:
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 注册的事件 */
short revents; /* 实际发生的事件,由内核填充 */
};
fd成员指定文件描述符。event成员告诉内核要监听fd上的哪些事件,它可以是一系列事件的按位或。revents成员由内核修改,以通知应用程序fd上实际发生了哪些事件。event的取值为:
上面事件选项中:
a. POLLRDNORM(普通数据可读)、POLLRDBAND(优先级带数据可读)和POLLWRNORM(普通数据可写)、POLLWRBAND(优先级带数据可写)将POLLIN(数据可读)和POLLOUT(数据可写)划分得更明显,以区分优先级带数据和普通数据,但是Linux并不完全支持。
b. 一般应用程序调用recv()时,要判断接收到的是有效数据还是对端关闭连接后触发的是根据recv()的返回值(如上面的select()示例程序),在poll()系统调用中,有更直接的方法,监听描述符的POLLRDHUP事件即监听对端关闭事件,不过需要在代码开始处定义”_GNU_SOURCE”
(2) fds数组成员的的个数由参数nfds指定(typedef unsigned long int nfds_t;)。显然,这个比select()的设计要灵活一点: 用户可以监测任意多数目文件描述符,但是poll()的实现也是依靠轮询的,从效率上来讲跟select()的实现是一致的。
(3) timeout参数指定函数的超时事件,单位为毫秒。当timeout为-1时,poll调用将一直阻塞直到监听的目标事件发生;当timeout为0时,poll()调用立即返回。
(4) poll()的返回值跟select()的返回值含义相同。
2.2 poll()同时收发普通数据和带外数据、对端关闭
利用poll()实现跟上述select()一样功能的代码: 监听连接关闭事件、接收普通数据事件和接收带外数据事件。
#define _GNU_SOURCE
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#define ERRP(con, ret, ...) do \
{ \
if (con) \
{ \
perror(__VA_ARGS__); \
ret; \
} \
}while(0)
#define BUFSIZE 1024
static const char* ip = "192.168.239.136";
static int port = 9660;
int main(void)
{
int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
ERRP(socket_fd <= 0, return -1, "socket");
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int ret = bind(socket_fd, (struct sockaddr* )&address, sizeof(address));
ERRP(ret < 0, goto ERR1, "connect");
ret = listen(socket_fd, 5);
ERRP(ret < 0, goto ERR1, "listen");
struct sockaddr_in client;
socklen_t client_addrlen = sizeof(client);
printf("Wait guest...\n\n");
int connfd = accept(socket_fd, (struct sockaddr* )&client, &client_addrlen);
ERRP(connfd < 0, goto ERR1, "accept");
printf("connect success: %s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
char buf[BUFSIZE] = {};
unsigned int cnt = 0;
struct pollfd fds;
fds.fd = connfd;
fds.events = (POLLIN | POLLRDHUP | POLLPRI);
while (1)
{
fds.revents = 0;
ret = poll(&fds, 1, 4000);
ERRP(ret < 0, goto ERR2, "poll");
//注意先判断对端退出事件
if (fds.revents & POLLRDHUP)
{
//printf("exit\n");
break;
}
else if (fds.revents & POLLIN)
{
bzero(buf, BUFSIZE);
ret = recv(connfd, buf, sizeof(buf) - 1, 0);
ERRP(ret < 0, goto ERR2, "recv normal data");
printf("get %d bytes of normal data: %s\n", ret, buf);
}
else if (fds.revents & POLLPRI)
{
bzero(buf, BUFSIZE);
ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
ERRP(ret < 0, goto ERR2, "recv OOB data");
printf("get %d bytes of OOB data: %s\n", ret, buf);
//printf ("fds.revents = %d, POLLPRI | POLLRDHUP\n", fds.revents);
}
if (ret == 0)
{
printf ("Time out: %d\n", ++cnt);
}
}
ERR2:
close(connfd);
ERR1:
close(socket_fd);
return 0;
}
在代码测试阶段,发现客户端关闭连接时,服务端接收的事件不仅POLLRDHUP(值8192),还有POLLIN(值1),即8193。如上代码,先判断若接收到POLLRDHUP后服务端程序退出,再判断若接收到POLLIN则服务端就去读数据。若二者的判断顺序颠倒,因为revent会一直等于POLLRDHUP,那么判断POLLIN的执行分支代码将会一直得到执行。
下面是测试程序的客户端代码,适用于上述两个服务端测试程序:
#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <libgen.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#define ERRP(con, ret, ...) do \
{ \
if (con) \
{ \
perror(__VA_ARGS__); \
ret; \
} \
}while(0)
static char isstop = 0;
static void handler(int sig)
{
printf("handler, sig = %d\n", sig);
}
int main(int argc, char** argv)
{
if (argc <= 2)
{
printf("usage: %s ip_address port number backlog\n", basename(argv[0]));
return -1;
}
signal(SIGPIPE, handler);
const char* ip = argv[1];
int port = atoi(argv[2]);
int backlog = 5;
int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
ERRP(socket_fd <= 0, return -1, "socket");
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int ret = connect(socket_fd, (struct sockaddr* )&address, sizeof(address));
ERRP(ret < 0, return -1, "connect");
const char* oob_data = "h";
const char* normal_data = "1234";
printf("Enter to send normal_data\n");
getchar();
send(socket_fd, normal_data, strlen(normal_data), 0);
printf("Enter to send normal_data\n");
getchar();
send(socket_fd, normal_data, strlen(normal_data), 0);
printf("Enter to send oob_data\n");
getchar();
send(socket_fd, oob_data, strlen(oob_data), MSG_OOB);
printf("Enter to send normal_data\n");
getchar();
send(socket_fd, normal_data, strlen(normal_data), 0);
printf("Enter to send normal_data\n");
getchar();
send(socket_fd, normal_data, strlen(normal_data), 0);
printf("Enter to send oob_data\n");
getchar();
send(socket_fd, oob_data, strlen(oob_data), MSG_OOB);
printf("Enter to close connect...");
getchar();
close(socket_fd);
getchar();
return 0;
}
上一篇: 计算机组成原理期末复习题笔记整合
下一篇: 进程在内存中的结构