欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

IO多路复用:select、poll、epoll

程序员文章站 2022-06-13 12:49:57
...

一、同步异步、阻塞非阻塞的概念区分

首先,一个 输入操作通常包括两个不同的阶段:

(1)等待数据准备好

(2)从内核向进程复制数据

对于一个套接字上的输入操作,第一步通常涉及等待数据从网络中到达。当所等待分组到达时,它被复制到内核中的某个缓冲区。第二步就是把数据从缓冲区复制到应用进程缓冲区。

1.同步与异步
同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication)
所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。
换句话说,就是由调用者主动等待这个调用的结果。

而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

2.阻塞与非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.

阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

举个例子来帮助理解同步异步、阻塞非阻塞:

假如你用水壶烧水。

(1)你把普通水壶放在电磁炉上,站在一旁等水开。(同步阻塞)

(2)你把普通水壶放在电磁炉上,然后去客厅看电视了,时不时去厨房看看水开了没有。(同步非阻塞)

(3)你把煮开时会发出提示声的改进水壶放在电磁炉上,站在一旁等水开。(异步阻塞)

(4)你把煮开时会发出提示声的改进水壶放在电磁炉上,然后去客厅看电视了,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞)

对于访问数据,阻塞非阻塞是说针对在得知访问的数据是否就绪这一问题,进程/线程是否需要等待(在此期间不能做其它事),同步异步是说进程/线程是否需要主动读写数据,同步则是需要主动读写数据,在读写数据的过程中还是会阻塞,异步则是只需要I/O操作完成的通知,进程/线程并不主动读写数据,由操作系统内核完成数据的读写。

二、Unix下的5种IO模型

  • 阻塞式IO
  • 非阻塞式IO
  • IO复用
  • 信号驱动式IO
  • 异步IO

    POSIX把同步IO和异步IO定义如下:

  • 同步IO操作导致请求进程阻塞,直到IO操作完成。
  • 异步IO操作不导致请求进程阻塞。

这五种IO模型里前四种都属于同步IO。

(一)阻塞式IO模型
IO多路复用:select、poll、epoll

最流行的IO模型是阻塞式IO,默认情形下,所有套接字都是阻塞的。

如图,此处recvfrom视为一个系统调用用以IO操作。当进程调用recvfrom时,内核就开始了IO的第一个阶段:准备数据。对于网络IO来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候内核就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当内核一直等到数据准备好了,它就会将数据从内核中拷贝到用户空间,然后内核返回结果,用户进程才解除阻塞的状态,开始处理数据。
所以,阻塞式 IO的特点就是在IO执行的两个阶段都被block了。这导致用户在发起IO请求时,不能做任何事情,对CPU的资源利用率不够。

(二)非阻塞式IO模型
IO多路复用:select、poll、epoll

进程把一个套接字设置成非阻塞是在通知内核:当所请求的IO操作非得把本进程投入睡眠才能完成时,不要把本进程投入睡眠,而是返回一个错误。

也就是说,当用户进程发出recvfrom操作时,如果内核中的数据还没有准备好,那么它并不会阻塞用户进程,而是立刻返回一个EWOULDBLOCK错误。从用户进程角度讲 ,它发起一个recvfrom操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送recvfrom操作。一旦某次再次调用recvfrom时内核中的数据准备好了,那么内核马上就将数据拷贝到了用户空间,然后返回。

所以,整个IO请求的过程中,虽然用户线程每次发起IO请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的CPU的资源。

(三)IO复用模型
IO多路复用:select、poll、epoll

有了IO复用,我们就可以调用select或poll或epoll,阻塞在这三个函数之上,而不是阻塞在真正的IO系统调用上。就拿select来说吧,进程阻塞于select调用 ,此时内核会监视所有select负责的套接字,当任何一个socket中的数据准备好了,select就会返回可读条件,之后用户进程就可以调用recvfrom直接让内核将数据拷贝到用户空间。使用select需要两个而不是单个系统调用,使用select的优势在于可以等待多个描述符就绪。所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

(四)信号驱动式IO模型
IO多路复用:select、poll、epoll

用户进程首先开启套接字的信号驱动式IO功能,并通过sigaction系统调用安装一个信号处理函数。该系统调用将立即返回,用户进程可以继续做其它的事,也就是说它没有被阻塞。当数据报准备好供读取时,内核就为该进程产生一个SIGIO信号。用户进程随后在信号处理函数中调用recvfrom直接让内核将数据拷贝到用户空间。

(五)异步IO模型
IO多路复用:select、poll、epoll

用户进程发起aio_read操作之后,立刻就可以开始做其它的事。在这期间,内核会在整个操作(包括等待数据和将数据从内核拷贝到用户空间)完成后给用户进程发送一个信号来通知。这种模型与信号驱动式IO模型的主要区别在于:信号驱动式IO是由内核通知用户进程何时可以启动一个IO操作,而异步IO模型是由内核通知用户进程IO操作何时完成了。

最后贴出5种IO模型的直观比较:
IO多路复用:select、poll、epoll

三、select函数

select函数允许进程指示内核等待多个事件中的任何一个发生,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒它。

(一)select函数及参数说明:

#include <sys/select.h>  
#include <sys/time.h>  
int select(int maxfdp1, fd_set *readfds, fd_set *writefds,  
           fd_set *exceptfds, struct timeval *timeout);  
//返回:若有就绪描述符则返回就绪描述符的数目,若超时返回0,若出错返回-1  

1.timeout:告知内核等待指定描述符中任何一个就绪花费的最长时间,其timeval结构用于指定秒数和微妙数。

//timeval结构:  
struct timeval  
{  
    long    tv_sec;     /* seconds */  
    long    tv_usec;    /* microseconds */  
};  

这个参数有以下三种可能:

timeout== NULL:永远等待下去,即仅当一个描述符准备好I/O时才返回。

timeout->tv_sec != 0 || tvptr->tv_usec !=0:等待一段固定的时间,超时返回0;在这段时间内如果有描述符准备好就返回。

timeout->tv_sec == 0 && tvptr->tv_usec == 0:根本不等待,即检查描述符后立即返回,这称为轮询(非阻塞式I/O就是轮询)。

2.中间的三个参数:指定要让内核测试读、写、异常的描述符,若对某一个不感兴趣可置为NULL。

这三个参数都是值-结果参数,调用函数时,用于指定所关心的描述符的值;函数返回时,结果将指示哪些描述符已经就绪。该函数返回后,使用FD_ISSET宏来测试fd_set数据类型中的描述符。描述符集内任何与未就绪描述符对应的位返回时均清为0。为此,每次重新调用select函数时,我们都得再次把所有描述符集内所关心的位均置为1.

select使用描述符集,通常是一个整数数组,其中每个整数的一位对应一个描述符。举例来说,假设使用32位整数,那么该数组的第一个元素对应于描述符0~31,第二个元素对应于描述符32~63,依此类推。所有这些实现细节都与应用程序无关,它们隐藏在为fd_set的数据类型和以下四个宏中:

FD_CLR(int fd, fd_set *set);  //关闭fd_set中的fd位  
FD_ISSET(int fd, fd_set *set); //测试该位是否打开,如果为1则该位对应描述符就绪  
FD_SET(int fd, fd_set *set);  //打开该fd位  
FD_ZERO(fd_set *set);         //清空所有位 

如下打开描述符1、4位:

fd_set rset;  
FD_ZERO(&rset);  //清空所有,每次调用select都要清空为0  
FD_SET(1,&rset);  //打开描述符1 
FD_SET(4,&rset);  //打开描述符4

3.maxfdp1参数指定待测试的描述符的个数,其值为最大待测试描述符加1。例如上例打开1、4描述符,那么这里maxfdp1值为5。

(二)select的特点

1.最大并发数限制:因为一个进程所打开的 FD (文件描述符)是有限制的,由 FD_SETSIZE 设置,默认值是 1024,因此 Select 模型的最大并发数就被相应限制了。如果要改变FD_SIZE的大小需要重新编译内核。

2.效率问题:select 每次调用都会线性扫描全部的 FD 集合,花费时间为O(n),这样效率就会呈现线性下降,即使将 FD_SETSIZE 改大其性能也会很差。
3. 内核/用户空间内存拷贝问题:select 采取了内存拷贝方法让内核把 FD 消息通知给用户空间。

4.事件集:select的参数类型fd_set没有将文件描述符和事件绑定,它仅仅是一个文件描述符的集合,因此select需要提供3个“值-结果”类型的参数分别传入和输出可读、可写和异常等事件(调用该函数时,指定所关心的描述符的值,函数返回时,结果将指示哪些描述符已经就绪),也就是select函数会修改指针readset、writeset、exceptset所指向的描述符集。

一方面,使得select不能处理更多类型的事件,所能处理的事件类型只有读写异常三类;

另一方面,描述符集内任何与未就绪描述符对应的位返回时都会被清空,因此每次重新调用select时,都需要再次把所有的描述符集内所关心的位置为1。

5.select函数的定时是由函数的最后一个参数决定的,它是一个timeval结构体,用于指定这段时间的秒数和微秒数。

四、poll函数

(一)poll函数及其参数说明

#include <poll.h>  
int poll(struct pollfd *fds, nfds_t nfds, int timeout);  
//返回:如有就绪描述符就返回其数目,超时返回0,若出错返回-1.

1.第一个参数是指向一个结构数组第一个元素的指针。每个元素是一个pollfd结构,用于指定测试某个给定描述符fd的条件。

//pollfd结构体  
struct pollfd  
{  
    int   fd;         /* file descriptor :要测试的描述符*/  
    short events;     /* requested events: 要测试的事件 */  
    short revents;    /* returned events :  返回该描述符的状态*/  
}; 

pollfd结构的成员events和revents避免了使用值-结果参数。这点与select不同。

用于指定这两个成员的一些常值如下:
IO多路复用:select、poll、epoll

2.第二个参数nfds:指定第一个参数结构数组中元素的个数。

3.第三个参数timeout:指定poll函数返回前等待多长时间。
IO多路复用:select、poll、epoll

INFTIM常值被定义为一个负值。

当发生错误时,poll函数的返回值为-1,若定时器到时之前没有任何描述符就绪,则返回0,否则返回就绪描述符的个数,即revents成员值非0的描述符个数。

如果我们不关心某个特定描述符,那么可以把与它对应的pollfd结构的fd成员设置成一个负值。poll函数将忽略这样的pollfd结构的events成员,返回时将它们的revents成员的值置为0。

(二)poll的特点

1.最大并发数限制:poll的第二个参数nfds是第一个参数指示的结构数据的元素个数,这个nfds并没有select的限制,它只受限于系统的内存空间(可以达到系统所允许打开的最大描述符的个数,即65535)。

2.效率问题:效率和select类似。

3.内核/用户空间内存拷贝问题:和select类似。

4.事件集:poll比select要“聪明”,它将描述符和事件定义在一起,任何事件都被统一处理,编程接口简洁许多。

一方面,poll可以监听的事件类型就可以更细分为很多种。

另一方面,而且内核每次修改的是pollfd结构体的revents成员,而events成员不变,因此下次重新调用poll无需重置pollfd类型中的事件集参数(避免了类似于select使用的的“值-结果”参数)。

5.poll的定时也是由函数的最后一个参数给出,但是它是一个int类型(指定函数要等待的毫秒数),而不是timeval结构体。

五、epoll函数

(一)epoll类的三个函数

int epoll_create(int size);  
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);   
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); 

1.int epoll_create(int size);

创建一个epoll的句柄,之后的所有操作将通过这个句柄来进行操作。size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数那样给出最大监听的fd+1的值。自从linux2.6.8之后,size参数是被忽略的,只要它比0大就可以。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

struct epoll_event  
{  
    uint32_t     events;      /* Epoll events */  
    epoll_data_t data;        /* User data variable */  
};  
typedef union epoll_data  
{  
    void        *ptr;  
    int          fd;  
    uint32_t     u32;  
    uint64_t     u64;  
} epoll_data_t;  

events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
3.int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

收集在epoll监控的事件中已经发生的事件。参数events是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)。maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1是永久阻塞)。如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时。

(二)epoll的两种工作模式:LT和ET

epoll有Level-Triggered和Edge-Triggered两种工作模式。

1.ET
Edge-Triggered是只支持非阻塞模式。当一个新的事件到达时,ET模式从epoll_wait调用中获取该事件,如果这次没有将该事件对应的套接字缓冲区处理完,在这个套接字中没有新的事件再次到来时,在ET模式下是无法再次从epoll_wait调用中获取这个事件的。也就是说,ET模式下,当有事件发生时,系统只会通知你一次,也就是调用epoll_wait 返回fd后,不管事件你处理与否,或者处理完全与否,再调用epoll_wait 时,都不会再返回该fd,这样程序员要自己保证在事件发生时及时有效的处理完。比如此时fd发生了EPOLLIN事件,在调用epoll_wait 后发现此事件,程序员要保证在本次轮询中对此fd进行了读操作,并且还要循环调用recv操作,一直读到recv的返回值小于请求值,或者遇到EAGAIN错误,不然下次轮询时,如果此fd没有再次触发事件,你就没有机会知道这个fd需要你的处理。这样无形中就增加了程序员的负担和出错的机会。
所以ET模式下被唤醒(返回就绪)的条件为:
(1)对于读操作:

  • 当buffer由不可读状态变为可读的时候,即由空变为不空的时候。
  • 当有新数据到达时,即buffer中的待读内容变多的时候。
  • 当buffer中有数据可读(即buffer不空)且用户对相应fd进行EPOLL_CTL_MOD成EPOLLIN事件时。

相关验证实验可以看博客:http://blog.chinaunix.net/uid-28541347-id-4288802.html

(2)对于写操作:

  • 当buffer由不可写变为可写的时候,即由满状态变为不满状态的时候。
  • 当有旧数据被发送走时,即buffer中待写的内容变少的时候。
  • 当buffer中有可写空间(即buffer不满)且用户对相应fd进行EPOLL_CTL_MOD成EPOLLOUT事件时。

相关验证实验可以看博客:http://blog.chinaunix.net/uid-28541347-id-4296180.html

2.LT
Level-Triggered是缺省工作方式,有阻塞和非阻塞两种方式。内核告诉你一个描述符是否就绪,然后可以对就绪的fd进行IO操作。如果不做任何操作,内核还会继续通知,因此该模式下编程出错可能性小。传统的select/poll是这样的模型。此方式可以认为是一个快速的poll。
所以LT模式下被唤醒(返回就绪)的条件除了包含ET模式的所有条件,还有以下:
(1)对于读操作:

  • 当buffer中有数据,且数据被读出一部分后buffer还不空的时候,即buffer中的内容减少的时候,LT模式返回读就绪。

(2)对于写操作:

  • 当buffer不满,又写了一部分数据后仍然不满的时候,即由于写操作的速度大于发送速度造成buffer中的内容增多的时候,LT模式返回写就绪。

二者的差异在于 level-trigger 模式下只要某个 fd 处于 readable/writable 状态,无论什么时候进行 epoll_wait 都会返回该 fd;而 edge-trigger 模式下只有某个 fd 从unreadable 变为 readable 或从 unwritable 变为 writable 时,epoll_wait 才会返回该 fd。

LT:水平触发,效率会低于ET触发,尤其在大并发,大流量的情况下。但是LT对代码编写要求比较低,不容易出现问题。LT模式服务编写上的表现是:只要有数据没有被获取,内核就不断通知你,因此不用担心事件丢失的情况。
ET:边缘触发,效率非常高,在并发,大流量的情况下,会比LT少很多epoll的系统调用,因此效率高。但是对编程要求高,需要细致的处理每个请求,否则容易发生丢失事件的情况。

(三)epoll的特点

(1)epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目,这个数字一般远大于 2048, 一般来说这个数目和系统内存关系很大 ,具体值可以 cat /proc/sys/fs/file-max[599534] 查看。

(2)效率提升, Epoll最大的优点就在于它基于事件的就绪通知方式:只管“活跃”的连接 ,而跟连接总数无关,其算法复杂度为O(1),因此在实际的网络环境中,epoll的效率就会远远高于 select 和 poll 。select和poll都是轮询方式的,每次调用要扫描整个注册文件描述符集合,并将其中就绪描述符返回给用户程序。epoll_wait采用的是回调方式,内核检测到就绪描述符时,将触发回调函数,回调函数就将该文件描述符上对应的事件插入内核就绪队列,不需要轮询。

(3)内存拷贝, Epoll 在这点上使用了“共享内存“,因此没有内存拷贝的开销。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

epoll与select、poll的区别如下:
IO多路复用:select、poll、epoll

六、关于ET模式下的读写

在epoll的ET模式下,正确的读写方式为:

  • 读:只要可读(即buffer中还有数据),就一直读,直到返回0,或者 errno = EAGAIN
  • 写:只要可写(即buffer还有空间且用户请求写的数据还未写完),就一直写,直到数据发送完。

    注意,对于读操作,errno==EAGAIN表示读缓冲队列已空。对于写操作,errno==EAGAIN表示写缓冲队列已满,但是不一定说明用户要求写的数据已经写完。
    正确的读操作代码为:

n = 0;
memset(buf, 0, sizeof(buf));
while ((nread = read(sockfd, buf + n, MAXLINE - 1)) > 0) {
     n += nread;
}
if (nread == -1 && errno != EAGAIN) {   //errno==EAGAIN表示缓冲队列已空
     perror("read error\n");
     exit(1);
}

正确的写操作代码为:

ssize_t data_size = strlen(buf);
n = data_size;
while (true) {
      nwrite = write(sockfd, buf + data_size - n, n);
      if (nwrite < 0) {
              //当socket是非阻塞时,如返回此错误,表示写缓冲队列已满
              //在这里做延时后再重试
              if (errno == EAGAIN) {
                    usleep(1000);
                    continue;
              }else {
                    perror("write error\n");
                    exit(1);
               }
       } else if (nwrite == n) {  //表示写完了
              break;
       } else {                   //还没写完
              n -= nwrite;
       }
}

七、关于accept

(一)当服务器端使用IO多路复用(即select、poll、epoll)时,accept应工作在非阻塞模式
1.原因:
如果accept工作在阻塞模式,考虑这种情况: TCP 连接被客户端夭折,即在服务器调用 accept 之前(此时select等已经返回连接到达读就绪),客户端主动发送 RST 终止连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞在 accept 调用上,直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在accept 调用上(实际应该阻塞在select上),就绪队列中的其他描述符都得不到处理。
2.解决办法:
把监听套接口设置为非阻塞, 当客户在服务器调用 accept 之前中止某个连接时,accept 调用可以立即返回 -1。
(二)ET模式下的accept操作
考虑这种情况:多个连接同时到达,服务器的 TCP 就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll 只会通知一次,accept 只处理一个连接,导致 TCP 就绪队列中剩下的连接都得不到处理。
解决办法是用 while 循环抱住 accept 调用,处理完 TCP 就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢? accept 返回 -1 并且 errno 设置为 EAGAIN 就表示所有连接都处理完。
比如如下代码:

for (; ;) {
     socklen = sizeof(cliaddr);
     if ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &socklen)) < 0) {
         //因为之前已经设置listenfd为非阻塞,所以当accept从已完成连接队列里取出所有连接后,
         //会返回EAGAIN错误表示accept处理完了,否则只是返回-1表示连接出错
         if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;
         } else {
                perror("accept error\n");
                exit(1);
         }
     } else {
           cout << "connection from " << inet_ntop(AF_INET, &cliaddr.sin_addr, buf, sizeof(cliaddr))
               << " ,port " << ntohs(cliaddr.sin_port) << endl;
           //将连接套接字注册进epoll事件
           set_nonblocking(connfd);
           ev.data.fd = connfd;
           ev.events = EPOLLIN | EPOLLET;
           if (epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev) < 0) {
                 perror("epoll_ctl error\n");
                 exit(1);
           }
     }
}

八、简单的客户端服务器例子

这个例子出自《UNIX网络编程》这本书,是执行如下步骤的一个回射服务器:
(1)客户从标准输入读入一行文本,并写给服务器。
(2)服务器从网络输入读入这行文本,并回射给客户。
(3)客户从网络输入读入这行回射文本,并显示在标准输出上。

(一)客户端

//
// Created by huxijie on 17-4-25.
// TCP回射客户端

#include <iostream>
#include <unistd.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <stdlib.h>

using namespace std;

const static int MAXLINE = 1024;
ssize_t Read(int fd,void *ptr,size_t nbytes) {
    ssize_t n;

    if ((n = read(fd, ptr, nbytes)) == -1) {
        perror("read error\n");
    }
    return n;
}

void Write(int fd,void *ptr,size_t nbytes) {
    if (write(fd, ptr, nbytes) != nbytes) {
        perror("write error\n");
        return;
    }
}

//TCP回射客户程序
void str_cli(FILE *fp,int sockfd) {
    int maxfdpl,stdineof;
    fd_set rset;
    char sendline[MAXLINE], recvline[MAXLINE];
    int n;


    stdineof = 0;       //标志,只要该值为0,则每次在主循环中select标准输入的可读性
    FD_ZERO(&rset);
    for (; ;) {
        if (stdineof == 0) {
            FD_SET(fileno(fp), &rset);
        }
        FD_SET(sockfd, &rset);
        maxfdpl = max(fileno(fp), sockfd) + 1;

        if (select(maxfdpl, &rset, NULL, NULL, NULL) < 0) {
            perror("select error\n");
            return;
        }

        if (FD_ISSET(fileno(fp), &rset)) {
            //当我们在标准输入上碰到EOF时,把新标志stdineof置为1,
            //并调用shutdown来发送FIN,从fd_set中移除fp
            if ((n = Read(fileno(fp), sendline, MAXLINE)) == 0) {
                stdineof = 1;
                shutdown(sockfd, SHUT_WR);
                FD_CLR(fileno(fp), &rset);
                continue;
            }
            Write(sockfd, sendline, n);
        }

        if (FD_ISSET(sockfd, &rset)) {
            if ((n = Read(sockfd, recvline, MAXLINE)) == 0) {
                if (1 == stdineof) {
                    return;
                } else {
                    perror("str_cli:server terminated permaturely\n");
                    return;
                }
            }
            Write(fileno(stdout), recvline, n);
        }
    }
}

int main(int argc,char **argv) {
    int sockfd;
    struct sockaddr_in servaddr;
    const int PORT = 8080;
    const char *IP = "127.0.0.1";


    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket error\n");
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    if (inet_pton(AF_INET, IP, &servaddr.sin_addr) <= 0) {
        perror("inet_pton error\n");
    }

    if (connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
        perror("connect error\n");
    }
    str_cli(stdin, sockfd);
    exit(0);
}

(二)服务器
1.用多进程实现并发编程

//
// Created by huxijie on 17-4-25.
// TCP回射服务器程序

#include <iostream>
#include <unistd.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>

using namespace std;

const char *IP = "127.0.0.1";
const int PORT = 8080;
const static int MAXLINE = 1024;
const static int LISTENQ = 1024;

ssize_t writen(int fd,const void *vptr,size_t n) {
    size_t nleft;
    ssize_t nwriten;
    const char *ptr;

    nleft = n;
    ptr = (const char *) vptr;
    while (nleft > 0) {
        if ((nwriten = write(fd, ptr, nleft)) <= 0) {
            if (nwriten < 0 && errno == EINTR) {
                nwriten = 0;
            } else {
                return -1;
            }
        }
        nleft -= nwriten;
        ptr += nwriten;
    }
    return n;
}

//在套接字上回射数据
void str_echo(int sockfd) {
    ssize_t n;
    char buf[MAXLINE];
    again:
    while ((n = read(sockfd, buf, MAXLINE)) > 0) {
        writen(sockfd, buf, n);
    }
        if (n < 0 && errno == EINTR) {
            goto again;
        } else if (n < 0) {
            perror("str_echo:read error\n");
        }
}

typedef void Sigfunc(int);
Sigfunc *my_signal(int signo,Sigfunc *func) {
    struct sigaction act, oact;

    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;
    act.sa_handler = func;
    act.sa_flags |= SA_RESTART;

    if (sigaction(signo, &act, & oact) < 0) {
        return SIG_ERR;
    } else {
        return oact.sa_handler;
    }
}

void sig_chld(int signo) {
    pid_t pid;
    int stat;

    while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
        cout << "child " << pid << " termined" << endl;
    }
    return;
}

//多进程
int main(int argc,char **argv) {
    int listenfd, connfd;
    struct sockaddr_in servaddr, cliaddr;
    socklen_t socklen = sizeof(servaddr);
    pid_t pid;
    char buff[MAXLINE];


    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket error\n");
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    if (inet_pton(AF_INET, IP, &servaddr.sin_addr) <= 0) {
        perror("inet_pton error\n");
    }

    if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
        perror("bind error\n");
    }

    if (listen(listenfd, LISTENQ) < 0) {
        perror("listen error\n");
    }

    my_signal(SIGCHLD, sig_chld);

    while (true) {
        if ((connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &socklen)) < 0) {
            if (errno == EINTR) {
                continue;
            } else {
                perror("accept error\n");
            }
        }

        if ((pid = fork()) == 0) {
            if (close(listenfd) < 0) {
                perror("close error\n");
            }
            cout << "connection from " << inet_ntop(AF_INET, &cliaddr.sin_addr, buff, MAXLINE)
            << " , port " << ntohs(cliaddr.sin_port) << endl;
            str_echo(connfd);
            if (close(connfd) < 0) {
                perror("close error\n");
            }
            _exit(0);
        }

        if (close(connfd) < 0) {
            perror("close error\n");
        }
    }
}

2.使用select

//
// Created by huxijie on 17-4-25.
// TCP回射服务器程序

#include <iostream>
#include <unistd.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>

using namespace std;

const static int MAXLINE = 1024;
const static int LISTENQ = 1024;
const char *IP = "127.0.0.1";
const int PORT = 8080;

ssize_t Read(int fd,void *ptr,size_t nbytes) {
    ssize_t n;

    if ((n = read(fd, ptr, nbytes)) == -1) {
        perror("read error\n");
    }
    return n;
}

void Write(int fd,void *ptr,size_t nbytes) {
    if (write(fd, ptr, nbytes) != nbytes) {
        perror("write error\n");
        return;
    }
}

//将套接字设置为非阻塞方式
void set_nonblocking(int sockfd) {
    int opts;

    if ((opts = fcntl(sockfd, F_GETFL)) < 0) {
        perror("fcntl(F_GETFL) error\n");
        exit(1);
    }

    opts |= O_NONBLOCK;
    if (fcntl(sockfd, F_SETFL, opts) < 0) {
        perror("fcntl(F_SETFL) error\n");
        exit(1);
    }
}


//使用select
int main(int argc,char **argv) {
    int i, maxi, maxfd, listenfd, connfd, sockfd;
    int nready, client[FD_SETSIZE];
    ssize_t n;
    fd_set rset, allset;
    char buf[MAXLINE];
    socklen_t clilen;
    struct sockaddr_in cliaddr, servaddr;

    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket error\n");
        return -1;
    }

    set_nonblocking(listenfd);

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    if (inet_pton(AF_INET, IP, &servaddr.sin_addr) <= 0) {
        perror("inet_pton error\n");
        return -1;
    }

    if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind error\n");
        return -1;
    }

    if (listen(listenfd, LISTENQ) < 0) {
        perror("listen error\n");
        return -1;
    }

    maxfd = listenfd;
    maxi = -1;
    for (i = 0; i < FD_SETSIZE; i++) {
        client[i] = -1;
    }
    FD_ZERO(&allset);
    FD_SET(listenfd,&allset);

    while (true) {
        rset = allset;
        if ((nready = select(maxfd + 1, &rset, NULL, NULL, NULL)) < 0) {
            perror("select error\n");
            return -1;
        }

        if (FD_ISSET(listenfd, &rset)) {    //新客户端连接请求
            clilen = sizeof(cliaddr);
            if ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &clilen)) < 0) {
                perror("accept error\n");
                return -1;
            }

            cout << "connection from " << inet_ntop(AF_INET, &cliaddr.sin_addr, buf, sizeof(buf))
            << ", port " << ntohs(cliaddr.sin_port) << endl;

            for (i = 0; i < FD_SETSIZE; ++i) {
                if (client[i] < 0) {
                    client[i] = connfd;     //保存新的描述符
                    break;
                }
            }
            if (i == FD_SETSIZE) {
                perror("too many clients\n");
                return -1;
            }

            FD_SET(connfd,&allset);         //加入新的描述符到集合中
            if (connfd > maxfd) {
                maxfd = connfd;
            }
            if (i > maxi) {
                maxi = i;       //在client[]数组中最大下标
            }
            if (--nready <= 0) {
                continue;       //若就绪数目减1后变为0,则直接跳过后续操作进入下一次while中
            }
        }

        for (i = 0; i <= maxi; ++i) {   //检查所有已连接描述符是否可读
            if ((sockfd = client[i]) < 0) {
                continue;
            }

            if (FD_ISSET(sockfd, &rset)) {
                if ((n = Read(sockfd, buf, MAXLINE)) == 0) {
                    if (close(sockfd) < 0) {
                        perror("close error\n");
                        return -1;
                    }
                    FD_CLR(sockfd,&allset);     //客户关闭了连接,所以要从集合中去掉本连接描述符
                    client[i] = -1;
                } else {
                    Write(sockfd, buf, n);
                }

                if (--nready <= 0) {
                    break;     //若就绪数目减1后变为0,则直接跳过后续操作进入下一次while中
                }
            }
        }
    }
}

3.使用poll
和上面使用select的程序不同的只是main函数。

//使用poll
int main(int argc,char **argv) {
    int i, maxi, listenfd, connfd, sockfd;
    int nready;
    ssize_t n;
    char buf[MAXLINE];
    socklen_t socklen;
    struct sockaddr_in cliaddr, servaddr;
    const int OPEN_MAX = sysconf(_SC_OPEN_MAX);
    struct pollfd client[OPEN_MAX];

    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket error\n");
        return -1;
    }

    set_nonblocking(listenfd);

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    if (inet_pton(AF_INET, IP, &servaddr.sin_addr) < 0) {
        perror("inet_pton error\n");
        return -1;
    }

    if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
        perror("bind error\n");
        return -1;
    }

    if (listen(listenfd, LISTENQ) < 0) {
        perror("listen error\n");
        return -1;
    }

    client[0].fd = listenfd;
    client[0].events = POLLRDNORM;
    for (i = 1; i < OPEN_MAX; ++i) {
        client[i].fd = -1;
    }
    maxi = 0;

    while (true) {
        if ((nready = poll(client, maxi + 1, -1)) < 0) {
            perror("poll error\n");
            return -1;
        }

        if (client[0].revents & POLLRDNORM) {
            socklen = sizeof(cliaddr);
            if ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &socklen)) < 0) {
                perror("accept error\n");
                return -1;
            }

            cout << "connection from " << inet_ntop(AF_INET, &cliaddr.sin_addr, buf, sizeof(cliaddr))
            << " ,port " << ntohs(cliaddr.sin_port) << endl;

            for (i = 1; i < OPEN_MAX; ++i) {
                if (client[i].fd < 0) {
                    client[i].fd = connfd;
                    break;
                }
            }
            if (i == OPEN_MAX) {
                perror("too many clients\n");
                return -1;
            }

            client[i].events = POLLRDNORM;
            if (i > maxi) {
                maxi = i;
            }
            if (--nready <= 0) {
                continue;
            }
        }

        for (i = 1; i <= maxi; ++i) {
            if ((sockfd = client[i].fd) < 0) {
                continue;
            }
            if (client[i].revents & (POLLRDNORM | POLLERR)) {
                if ((n = read(sockfd, buf, MAXLINE-1)) < 0) {
                    if (errno == ECONNRESET) {
                        if (close(sockfd) < 0) {
                            perror("close error\n");
                            return -1;
                        }
                        client[i].fd = -1;
                    } else {
                        perror("read error\n");
                    }
                } else if (0 == n) {
                    if (close(sockfd) < 0) {
                        perror("close error\n");
                        return -1;
                    }
                    client[i].fd = -1;
                } else {
                    Write(sockfd, buf, n);
                }

                if (--nready <= 0) {
                    break;
                }
            }
        }
    }
}

4.使用epoll

//将套接字设置为非阻塞方式
void set_nonblocking(int sockfd) {
    int opts;

    if ((opts = fcntl(sockfd, F_GETFL)) < 0) {
        perror("fcntl(F_GETFL) error\n");
        exit(1);
    }

    opts |= O_NONBLOCK;
    if (fcntl(sockfd, F_SETFL, opts) < 0) {
        perror("fcntl(F_SETFL) error\n");
        exit(1);
    }
}

//使用epoll
int main(int argc,char **argv) {
    const int OPEN_MAX = 1024;
    const int MAXEVENTS = 64;
    int epfd,listenfd,connfd,sockfd;
    ssize_t n,nread,nwrite;
    sockaddr_in servaddr, cliaddr;
    socklen_t socklen;
    struct epoll_event ev,events[MAXEVENTS];    //ev用于注册事件,数组用于回传要处理的事件
    char buf[MAXLINE];


    //创建listenfd监听套接字
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket error\n");
        exit(1);
    }

    //设置非阻塞
    set_nonblocking(listenfd);

    //创建一个epoll的句柄,该句柄占用一个fd值,所以epoll使用完后要关闭
    if ((epfd = epoll_create(MAXEVENTS)) < 0) {
        perror("epoll_create error\n");
        exit(1);
    }
    //设置与要处理的事件相关的文件描述符
    ev.data.fd = listenfd;
    //设置要处理的事件类型
    ev.events = EPOLLIN | EPOLLET;
    //注册epoll事件
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) < 0) {
        perror("epoll_ctl error\n");
        exit(1);
    }

    //设置servaddr
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    if (inet_pton(AF_INET, IP, &servaddr.sin_addr) < 0) {
        perror("inet_pton error\n");
        exit(1);
    }

    //绑定
    if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
        perror("bind error\n");
        exit(1);
    }

    //监听
    if (listen(listenfd, LISTENQ) < 0) {
        perror("listen error\n");
        exit(1);
    }

    int nfds;
    for (; ;) {
        //收集发生的事件,nfds是已经准备好的描述符个数
        //下面的for循环只扫描已经准备好的描述符,这正是epoll比poll的高效之处
        //定时器时间设置为-1表示一直等待直到有事件就绪
        if ((nfds = epoll_wait(epfd, events, MAXEVENTS, -1)) < 0) {
            perror("epoll_wait error\n");
            exit(1);
        }

        for (int i = 0; i < nfds; ++i) {
            sockfd = events[i].data.fd;
            if (sockfd == listenfd) {
                for (; ;) {
                    socklen = sizeof(cliaddr);
                    if ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &socklen)) < 0) {
                        //因为之前已经设置listenfd为非阻塞,所以当accept从已完成连接队列里取出所有连接后,
                        //会返回EAGAIN错误表示accept处理完了,否则只是返回-1表示连接出错
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                            break;
                        } else {
                            perror("accept error\n");
                            exit(1);
                        }
                    } else {
                        cout << "connection from " << inet_ntop(AF_INET, &cliaddr.sin_addr, buf, sizeof(cliaddr))
                        << " ,port " << ntohs(cliaddr.sin_port) << endl;
                        //将连接套接字注册进epoll事件
                        set_nonblocking(connfd);
                        ev.data.fd = connfd;
                        ev.events = EPOLLIN | EPOLLET;
                        if (epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev) < 0) {
                            perror("epoll_ctl error\n");
                            exit(1);
                        }
                    }
                }
                continue;
            } else {
                if (events[i].events & EPOLLIN) {
                    n = 0;
                    memset(buf, 0, sizeof(buf));
                    while ((nread = read(sockfd, buf + n, MAXLINE - 1)) > 0) {
                        n += nread;
                    }
                    if (nread == -1 && errno != EAGAIN) {   //errno==EAGAIN表示缓冲队列已空
                        perror("read error\n");
                        exit(1);
                    }

                    //缓冲队列中的数据已经读取完毕,所以更新该fd为EPOLLOUT
                    ev.data.fd = sockfd;
                    ev.events = events[i].events | EPOLLOUT;
                    if (epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev) < 0) {
                        perror("epoll_ctl error\n");
                        exit(1);
                    }
                }
                if (events[i].events & EPOLLOUT) {
                    ssize_t data_size = strlen(buf);
                    n = data_size;
                    while (true) {
                        nwrite = write(sockfd, buf + data_size - n, n);
                        if (nwrite < 0) {
                            //当socket是非阻塞时,如返回此错误,表示写缓冲队列已满
                            //在这里做延时后再重试
                            if (errno == EAGAIN) {
                                usleep(1000);
                                continue;
                            }else {
                                perror("write error\n");
                                exit(1);
                            }
                        } else if (nwrite == n) {  //表示写完了
                            break;
                        } else {                   //还没写完
                            n -= nwrite;
                        }
                    }
                    //数据发送完毕,更新该fd为EPOLLIN
                    ev.data.fd = sockfd;
                    ev.events = EPOLLIN | EPOLLET;
                    if (epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev) < 0) {
                        perror("epoll_ctl error\n");
                        exit(1);
                    }
                }
            }
        }
    }
}

以上4种方式所编写的程序实现的效果都是一样的。
先后运行服务器和客户端:
IO多路复用:select、poll、epoll
在客户端里输入一行字符串后,会回显该字符串:
IO多路复用:select、poll、epoll
当打开另一个客户端时:
IO多路复用:select、poll、epoll
在新的客户端里输入一行字符串:
IO多路复用:select、poll、epoll

参考

《UNIX网络编程》卷1:套接字联网API

知乎:https://www.zhihu.com/question/19732473

historyasamirror:http://blog.csdn.net/historyasamirror/article/details/5778378

Sunshine_top:http://blog.csdn.net/u013074465/article/details/44964835

ljx0305:http://blog.csdn.net/ljx0305/article/details/4065058

mirkerson:http://blog.csdn.net/mirkerson/article/details/7468347

shuxiaogd:http://blog.csdn.net/shuxiaogd/article/details/50366039

zhang_shuai_2011:http://blog.csdn.net/zhang_shuai_2011/article/details/7675797

lvyilong316:http://blog.chinaunix.net/uid/28541347/cid-191916-list-3.html