欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Linux IO复用--select()和poll()

程序员文章站 2022-06-13 14:02:51
...

《Linux高性能服务器编程》阅读笔记:

  Linux系统中IO复用的系统调用有selece()、poll()和epoll()。IO复用使得程序可以同时监听多个文件描述符的就绪事件的发生,应用场景如:
  (1) 服务端程序同时处理监听socket和连接socket
  (2) 服务端要同时处理TCP请求和UDP请求
  (3) 服务端要同时监听多个端口或者处理多种服务请求
  (4) 客户端要同时处理多个socket
  (5) 客户端程序要同时处理用户输入和网络连接

  不过要清楚的一点是,IO复用虽然能同时监听多个文件描述符,但它是阻塞监听的,当有多个文件描述符同时就绪时,若不采取额外措施,程序只能按照顺序依次处理其中的每一个就绪事件。这实际上也是串行工作,要实现并发,只能使用多进程或多线程等编程手段

1. select()系统调用

1.1 select()函数原型

  select()系统的用途是,在超时时间内,监听用户感兴趣的文件描述符上的可读可写、异常事件的发生。其函数原型为:

#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

  (1) nfds指定被监听的文件描述符的总数,设置为监听的所有文件描述符中最大值加1,因为文件描述符是从0开始计数的。
  (2) readfds、writefds和exceptfds参数分别指向可读、可写、异常事件对应的文件描述符集合。程序员通过这3个参数向该调用传入自己感兴趣的文件描述符。函数返回时,内核将修改它们来通知应用程序哪些文件描述符已经就绪。原型如下:

#define XFD_SETSIZE     256
#define FD_SETSIZE      XFD_SETSIZE

typedef long fd_mask;
#define NBBY    8
#define NFDBITS (sizeof(fd_mask) * NBBY)

#define howmany(x,y)    (((x)+((y)-1))/(y))

#if defined(BSD) && BSD < 198911
typedef struct fd_set 
{
    fd_mask fds_bits[howmany(FD_SETSIZE, NFDBITS)];
} fd_set;
#endif

  fd_set结构体仅包含一个long型数组,根据推导可见该数组为

long fds_bits[8];

  fds_bit共占据32字节,即256位。该数组的每个元素的每一位(bit)标记一个文件描述符。fd_set能容纳的文件描述符由FD_SETSIZE指定,显然这限制了select()能同时处理的文件描述符的总数。该系统调用还提供了一些列宏方便程序员实现位操作:

void FD_CLR(int fd, fd_set *set);       //清零set中的fd位        
int  FD_ISSET(int fd, fd_set *set);     //测试set中的fd位是否被设置
void FD_SET(int fd, fd_set *set);       //设置fd中的fd位
void FD_ZERO(fd_set *set);              //清零set中所有位

  (3) timeout参数设置函数的超时时间。它是一个timeval的普通(非const)指针,内核可以修改此参数以告诉应用程序函数阻塞等待了多久。不过内核返回的该值不能完全信任,比如调用失败时timeout的值是不确定的。timeval结构体定义如下:

struct timeval {
   long    tv_sec;         /* seconds */
   long    tv_usec;        /* microseconds */
};

  当tv_sec(秒)和tv_usec(微秒)都设置为0则select()立即返回,当timeout为NULL则select()将会一直阻塞直到某个文件描描述符就绪。

  selece()执行成功时返回就绪的文件描述符的总数;若在超时时间内没有任何文件描述符就绪,select()将返回0;失败将返回-1并设置errno。若在select()阻塞等待期间程序收到信号,将立即返回-1并设置errno为EINTR

1.2 socket文件描述符就绪条件

  以Linux网络编程中,多路复用判断socket文件描述符可读的依据是:
  (1) socket内核接收缓冲区中字节数>=其低水位标记SO_RCVLOWAT时,此时程序可以无阻塞地读该socket,返回读取到的字节数(>0)
  (2) socket通信的对端关闭连接,此时对该socket的读操作将返回0表示对端关闭
  (3) 监听socket上有新的连接请求
  (4) socket上有未处理的错误,此时可以使用getsockopt()读取和清除该错误(使用SO_ERROR标记)

  socket文件描述符可写:
  (1) socket内核发送缓冲区的空闲区域大于或等于其低水位标记SO_SNDLOWAT,此时程序可以无阻塞的写该socket,返回写入的字节数(>0)
  (2) socket的写操作被关闭(使用shotdown(fd, SHUT_WR))后再对socket写,会触发一个SIGPIPE信号
  (3) socket使用非阻塞connect()连接成功或者失败(超时)之后,对于后者将会收到RST报文段,若收到RST报文段后继续往该socket写则会触发SIGPIPE信号
  (4) socket上未处理的错误

  socket文件描述符异常:
  socket上接收到带外数据

1.3 利用select()同时收发普通数据和带外数据、对端关闭

#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <libgen.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/select.h>

#define ERRP(con, ret, ...) do                              \
{                                                           \
    if (con)                                                \
    {                                                       \
        perror(__VA_ARGS__);                                \
        ret;                                                \
    }                                                       \
}while(0)

#define BUFSIZE 1024
static const char* ip = "192.168.239.136";
static int port = 9660;

int main(void)
{
    int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
    ERRP(socket_fd <= 0, return -1, "socket");

    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);
    int ret = bind(socket_fd, (struct sockaddr* )&address, sizeof(address));
    ERRP(ret < 0, goto ERR1, "connect");

    ret = listen(socket_fd, 5);
    ERRP(ret < 0, goto ERR1, "listen");

    struct sockaddr_in client;
    socklen_t client_addrlen = sizeof(client);
    printf("Wait guest...\n\n");
    int connfd = accept(socket_fd, (struct sockaddr* )&client, &client_addrlen);
    ERRP(connfd < 0, goto ERR1, "accept");
    printf("connect success: %s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));

    char buf[BUFSIZE] = {};
    fd_set read_fds;
    fd_set exception_fds;
    FD_ZERO(&read_fds);
    FD_ZERO(&exception_fds);
    struct timeval timeout;
    unsigned int cnt = 0;

    while (1)
    {
        bzero(buf, BUFSIZE);

        //select()的参数在每次select()函数的返回会被内核修改,所以这里需要重新设置
        FD_SET(connfd, &read_fds);      //将connfd加入就绪读监听集合
        FD_SET(connfd, &exception_fds); //将connfd加入异常监听集合
        timeout.tv_usec = 0;            //超时时间为4s
        timeout.tv_sec = 4;

        ret = select(connfd + 1, &read_fds, NULL, &exception_fds, &timeout);
        ERRP(ret < 0, goto ERR2, "select");

        if (FD_ISSET(connfd, &read_fds))
        {
            ret = recv(connfd, buf, sizeof(buf) - 1, 0);    //recv返回0表示对端已经关闭
            ERRP(ret < 0, goto ERR2, "recv normal data");
            if (ret == 0)
            {
                printf("guest exit...\n");
                break;
            }
            printf("get %d bytes of normal data: %s\n", ret, buf);
        }
        else if (FD_ISSET(connfd, &exception_fds))
        {
            ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
            ERRP(ret < 0, goto ERR2, "recv OOB data");
            if (ret == 0)
            {
                printf("guest exit...\n");
                break;
            }
            printf("get %d bytes of OOB data: %s\n", ret, buf);
        }
        else if (ret == 0)  //select()返回0表示超时返回
        {
            printf("time out %d\n", ++cnt);
        }
    }

ERR2:
    close(connfd);
ERR1:
    close(socket_fd);  

    return 0;
}

2. poll()系统调用

2.1 poll()函数原型

  poll()系统调用与select()类似,也是在指定的时间内轮询一定数量的文件描述符,监听其是否就绪。

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

  (1) fds参数是一个pollfd结构体类型的指针,它指定所有程序员感兴趣的文件描述符上发生的可读、可写、异常等事件。既然它是一个结构体指针,就可以指向该类型的数组。

pollfd结构体的原型为:
struct pollfd {
   int   fd;         /* 文件描述符 */
   short events;     /* 注册的事件 */
   short revents;    /* 实际发生的事件,由内核填充 */
};

  fd成员指定文件描述符。event成员告诉内核要监听fd上的哪些事件,它可以是一系列事件的按位或。revents成员由内核修改,以通知应用程序fd上实际发生了哪些事件。event的取值为:
Linux IO复用--select()和poll()

  上面事件选项中:
  a. POLLRDNORM(普通数据可读)、POLLRDBAND(优先级带数据可读)和POLLWRNORM(普通数据可写)、POLLWRBAND(优先级带数据可写)将POLLIN(数据可读)和POLLOUT(数据可写)划分得更明显,以区分优先级带数据和普通数据,但是Linux并不完全支持。
  b. 一般应用程序调用recv()时,要判断接收到的是有效数据还是对端关闭连接后触发的是根据recv()的返回值(如上面的select()示例程序),在poll()系统调用中,有更直接的方法,监听描述符的POLLRDHUP事件即监听对端关闭事件,不过需要在代码开始处定义”_GNU_SOURCE

  (2) fds数组成员的的个数由参数nfds指定(typedef unsigned long int nfds_t;)。显然,这个比select()的设计要灵活一点: 用户可以监测任意多数目文件描述符,但是poll()的实现也是依靠轮询的,从效率上来讲跟select()的实现是一致的。

  (3) timeout参数指定函数的超时事件,单位为毫秒。当timeout为-1时,poll调用将一直阻塞直到监听的目标事件发生;当timeout为0时,poll()调用立即返回。

  (4) poll()的返回值跟select()的返回值含义相同。

2.2 poll()同时收发普通数据和带外数据、对端关闭

  利用poll()实现跟上述select()一样功能的代码: 监听连接关闭事件、接收普通数据事件和接收带外数据事件。

#define _GNU_SOURCE

#include <string.h>  
#include <stdio.h>  
#include <stdlib.h>  
#include <unistd.h>  
#include <sys/select.h>  
#include <sys/time.h>  
#include <sys/socket.h>  
#include <netinet/in.h>  
#include <arpa/inet.h>  
#include <poll.h>

#define ERRP(con, ret, ...) do                              \
{                                                           \
    if (con)                                                \
    {                                                       \
        perror(__VA_ARGS__);                                \
        ret;                                                \
    }                                                       \
}while(0)

#define BUFSIZE 1024
static const char* ip = "192.168.239.136";
static int port = 9660;

int main(void)
{
    int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
    ERRP(socket_fd <= 0, return -1, "socket");

    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);
    int ret = bind(socket_fd, (struct sockaddr* )&address, sizeof(address));
    ERRP(ret < 0, goto ERR1, "connect");

    ret = listen(socket_fd, 5);
    ERRP(ret < 0, goto ERR1, "listen");

    struct sockaddr_in client;
    socklen_t client_addrlen = sizeof(client);
    printf("Wait guest...\n\n");
    int connfd = accept(socket_fd, (struct sockaddr* )&client, &client_addrlen);
    ERRP(connfd < 0, goto ERR1, "accept");
    printf("connect success: %s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));

    char buf[BUFSIZE] = {};
    unsigned int cnt = 0;

    struct pollfd fds;
    fds.fd = connfd;
    fds.events = (POLLIN | POLLRDHUP | POLLPRI);

    while (1)
    {

        fds.revents = 0;

        ret = poll(&fds, 1, 4000);
        ERRP(ret < 0, goto ERR2, "poll");

        //注意先判断对端退出事件
        if (fds.revents & POLLRDHUP)
        {

            //printf("exit\n");
            break;
        }
        else if (fds.revents & POLLIN)
        {
           bzero(buf, BUFSIZE);
           ret = recv(connfd, buf, sizeof(buf) - 1, 0);
           ERRP(ret < 0, goto ERR2, "recv normal data");
           printf("get %d bytes of normal data: %s\n", ret, buf);
        }
        else if (fds.revents & POLLPRI)
        {
            bzero(buf, BUFSIZE);
            ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
            ERRP(ret < 0, goto ERR2, "recv OOB data");
            printf("get %d bytes of OOB data: %s\n", ret, buf);
            //printf ("fds.revents = %d, POLLPRI | POLLRDHUP\n", fds.revents);
        }

        if (ret == 0)    
        {
             printf ("Time out: %d\n", ++cnt);
        }   
    }

ERR2:
    close(connfd);
ERR1:
    close(socket_fd);  

    return 0;
}

  在代码测试阶段,发现客户端关闭连接时,服务端接收的事件不仅POLLRDHUP(值8192),还有POLLIN(值1),即8193。如上代码,先判断若接收到POLLRDHUP后服务端程序退出,再判断若接收到POLLIN则服务端就去读数据。若二者的判断顺序颠倒,因为revent会一直等于POLLRDHUP,那么判断POLLIN的执行分支代码将会一直得到执行。

  下面是测试程序的客户端代码,适用于上述两个服务端测试程序:

#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <libgen.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>

#define ERRP(con, ret, ...) do                              \
{                                                           \
    if (con)                                                \
    {                                                       \
        perror(__VA_ARGS__);                                \
        ret;                                                \
    }                                                       \
}while(0)

static char isstop = 0;

static void handler(int sig)
{
    printf("handler, sig = %d\n", sig);
}

int main(int argc, char** argv)
{
    if (argc <= 2)
    {
        printf("usage: %s ip_address port number backlog\n", basename(argv[0]));
        return -1;
    }

    signal(SIGPIPE, handler);

    const char* ip = argv[1];
    int port = atoi(argv[2]);
    int backlog = 5;

    int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
    ERRP(socket_fd <= 0, return -1, "socket");

    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);
    int ret = connect(socket_fd, (struct sockaddr* )&address, sizeof(address));
    ERRP(ret < 0, return -1, "connect");

    const char* oob_data = "h";
    const char* normal_data = "1234";

    printf("Enter to send normal_data\n");
    getchar();  
    send(socket_fd, normal_data, strlen(normal_data), 0);

    printf("Enter to send normal_data\n");
    getchar();  
    send(socket_fd, normal_data, strlen(normal_data), 0);

    printf("Enter to send oob_data\n");
    getchar();  
    send(socket_fd, oob_data, strlen(oob_data), MSG_OOB);

    printf("Enter to send normal_data\n");
    getchar();  
    send(socket_fd, normal_data, strlen(normal_data), 0);

    printf("Enter to send normal_data\n");
    getchar();  
    send(socket_fd, normal_data, strlen(normal_data), 0);

    printf("Enter to send oob_data\n");
    getchar();  
    send(socket_fd, oob_data, strlen(oob_data), MSG_OOB);

    printf("Enter to close connect...");
    getchar();
    close(socket_fd);
    getchar();

    return 0;
}