IO多路复用:select、poll、epoll
一、同步异步、阻塞非阻塞的概念区分
首先,一个 输入操作通常包括两个不同的阶段:
(1)等待数据准备好
(2)从内核向进程复制数据
对于一个套接字上的输入操作,第一步通常涉及等待数据从网络中到达。当所等待分组到达时,它被复制到内核中的某个缓冲区。第二步就是把数据从缓冲区复制到应用进程缓冲区。
1.同步与异步
同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication)
所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。
换句话说,就是由调用者主动等待这个调用的结果。
而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。
2.阻塞与非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.
阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
举个例子来帮助理解同步异步、阻塞非阻塞:
假如你用水壶烧水。
(1)你把普通水壶放在电磁炉上,站在一旁等水开。(同步阻塞)
(2)你把普通水壶放在电磁炉上,然后去客厅看电视了,时不时去厨房看看水开了没有。(同步非阻塞)
(3)你把煮开时会发出提示声的改进水壶放在电磁炉上,站在一旁等水开。(异步阻塞)
(4)你把煮开时会发出提示声的改进水壶放在电磁炉上,然后去客厅看电视了,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞)
对于访问数据,阻塞非阻塞是说针对在得知访问的数据是否就绪这一问题,进程/线程是否需要等待(在此期间不能做其它事),同步异步是说进程/线程是否需要主动读写数据,同步则是需要主动读写数据,在读写数据的过程中还是会阻塞,异步则是只需要I/O操作完成的通知,进程/线程并不主动读写数据,由操作系统内核完成数据的读写。
二、Unix下的5种IO模型
- 阻塞式IO
- 非阻塞式IO
- IO复用
- 信号驱动式IO
-
异步IO
POSIX把同步IO和异步IO定义如下:
- 同步IO操作导致请求进程阻塞,直到IO操作完成。
- 异步IO操作不导致请求进程阻塞。
这五种IO模型里前四种都属于同步IO。
(一)阻塞式IO模型
最流行的IO模型是阻塞式IO,默认情形下,所有套接字都是阻塞的。
如图,此处recvfrom视为一个系统调用用以IO操作。当进程调用recvfrom时,内核就开始了IO的第一个阶段:准备数据。对于网络IO来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候内核就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当内核一直等到数据准备好了,它就会将数据从内核中拷贝到用户空间,然后内核返回结果,用户进程才解除阻塞的状态,开始处理数据。
所以,阻塞式 IO的特点就是在IO执行的两个阶段都被block了。这导致用户在发起IO请求时,不能做任何事情,对CPU的资源利用率不够。
(二)非阻塞式IO模型
进程把一个套接字设置成非阻塞是在通知内核:当所请求的IO操作非得把本进程投入睡眠才能完成时,不要把本进程投入睡眠,而是返回一个错误。
也就是说,当用户进程发出recvfrom操作时,如果内核中的数据还没有准备好,那么它并不会阻塞用户进程,而是立刻返回一个EWOULDBLOCK错误。从用户进程角度讲 ,它发起一个recvfrom操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送recvfrom操作。一旦某次再次调用recvfrom时内核中的数据准备好了,那么内核马上就将数据拷贝到了用户空间,然后返回。
所以,整个IO请求的过程中,虽然用户线程每次发起IO请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的CPU的资源。
(三)IO复用模型
有了IO复用,我们就可以调用select或poll或epoll,阻塞在这三个函数之上,而不是阻塞在真正的IO系统调用上。就拿select来说吧,进程阻塞于select调用 ,此时内核会监视所有select负责的套接字,当任何一个socket中的数据准备好了,select就会返回可读条件,之后用户进程就可以调用recvfrom直接让内核将数据拷贝到用户空间。使用select需要两个而不是单个系统调用,使用select的优势在于可以等待多个描述符就绪。所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
(四)信号驱动式IO模型
用户进程首先开启套接字的信号驱动式IO功能,并通过sigaction系统调用安装一个信号处理函数。该系统调用将立即返回,用户进程可以继续做其它的事,也就是说它没有被阻塞。当数据报准备好供读取时,内核就为该进程产生一个SIGIO信号。用户进程随后在信号处理函数中调用recvfrom直接让内核将数据拷贝到用户空间。
(五)异步IO模型
用户进程发起aio_read操作之后,立刻就可以开始做其它的事。在这期间,内核会在整个操作(包括等待数据和将数据从内核拷贝到用户空间)完成后给用户进程发送一个信号来通知。这种模型与信号驱动式IO模型的主要区别在于:信号驱动式IO是由内核通知用户进程何时可以启动一个IO操作,而异步IO模型是由内核通知用户进程IO操作何时完成了。
最后贴出5种IO模型的直观比较:
三、select函数
select函数允许进程指示内核等待多个事件中的任何一个发生,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒它。
(一)select函数及参数说明:
#include <sys/select.h>
#include <sys/time.h>
int select(int maxfdp1, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
//返回:若有就绪描述符则返回就绪描述符的数目,若超时返回0,若出错返回-1
1.timeout:告知内核等待指定描述符中任何一个就绪花费的最长时间,其timeval结构用于指定秒数和微妙数。
//timeval结构:
struct timeval
{
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
这个参数有以下三种可能:
timeout== NULL:永远等待下去,即仅当一个描述符准备好I/O时才返回。
timeout->tv_sec != 0 || tvptr->tv_usec !=0:等待一段固定的时间,超时返回0;在这段时间内如果有描述符准备好就返回。
timeout->tv_sec == 0 && tvptr->tv_usec == 0:根本不等待,即检查描述符后立即返回,这称为轮询(非阻塞式I/O就是轮询)。
2.中间的三个参数:指定要让内核测试读、写、异常的描述符,若对某一个不感兴趣可置为NULL。
这三个参数都是值-结果参数,调用函数时,用于指定所关心的描述符的值;函数返回时,结果将指示哪些描述符已经就绪。该函数返回后,使用FD_ISSET宏来测试fd_set数据类型中的描述符。描述符集内任何与未就绪描述符对应的位返回时均清为0。为此,每次重新调用select函数时,我们都得再次把所有描述符集内所关心的位均置为1.
select使用描述符集,通常是一个整数数组,其中每个整数的一位对应一个描述符。举例来说,假设使用32位整数,那么该数组的第一个元素对应于描述符0~31,第二个元素对应于描述符32~63,依此类推。所有这些实现细节都与应用程序无关,它们隐藏在为fd_set的数据类型和以下四个宏中:
FD_CLR(int fd, fd_set *set); //关闭fd_set中的fd位
FD_ISSET(int fd, fd_set *set); //测试该位是否打开,如果为1则该位对应描述符就绪
FD_SET(int fd, fd_set *set); //打开该fd位
FD_ZERO(fd_set *set); //清空所有位
如下打开描述符1、4位:
fd_set rset;
FD_ZERO(&rset); //清空所有,每次调用select都要清空为0
FD_SET(1,&rset); //打开描述符1
FD_SET(4,&rset); //打开描述符4
3.maxfdp1参数指定待测试的描述符的个数,其值为最大待测试描述符加1。例如上例打开1、4描述符,那么这里maxfdp1值为5。
(二)select的特点
1.最大并发数限制:因为一个进程所打开的 FD (文件描述符)是有限制的,由 FD_SETSIZE 设置,默认值是 1024,因此 Select 模型的最大并发数就被相应限制了。如果要改变FD_SIZE的大小需要重新编译内核。
2.效率问题:select 每次调用都会线性扫描全部的 FD 集合,花费时间为O(n),这样效率就会呈现线性下降,即使将 FD_SETSIZE 改大其性能也会很差。
3. 内核/用户空间内存拷贝问题:select 采取了内存拷贝方法让内核把 FD 消息通知给用户空间。
4.事件集:select的参数类型fd_set没有将文件描述符和事件绑定,它仅仅是一个文件描述符的集合,因此select需要提供3个“值-结果”类型的参数分别传入和输出可读、可写和异常等事件(调用该函数时,指定所关心的描述符的值,函数返回时,结果将指示哪些描述符已经就绪),也就是select函数会修改指针readset、writeset、exceptset所指向的描述符集。
一方面,使得select不能处理更多类型的事件,所能处理的事件类型只有读写异常三类;
另一方面,描述符集内任何与未就绪描述符对应的位返回时都会被清空,因此每次重新调用select时,都需要再次把所有的描述符集内所关心的位置为1。
5.select函数的定时是由函数的最后一个参数决定的,它是一个timeval结构体,用于指定这段时间的秒数和微秒数。
四、poll函数
(一)poll函数及其参数说明
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
//返回:如有就绪描述符就返回其数目,超时返回0,若出错返回-1.
1.第一个参数是指向一个结构数组第一个元素的指针。每个元素是一个pollfd结构,用于指定测试某个给定描述符fd的条件。
//pollfd结构体
struct pollfd
{
int fd; /* file descriptor :要测试的描述符*/
short events; /* requested events: 要测试的事件 */
short revents; /* returned events : 返回该描述符的状态*/
};
pollfd结构的成员events和revents避免了使用值-结果参数。这点与select不同。
用于指定这两个成员的一些常值如下:
2.第二个参数nfds:指定第一个参数结构数组中元素的个数。
3.第三个参数timeout:指定poll函数返回前等待多长时间。
INFTIM常值被定义为一个负值。
当发生错误时,poll函数的返回值为-1,若定时器到时之前没有任何描述符就绪,则返回0,否则返回就绪描述符的个数,即revents成员值非0的描述符个数。
如果我们不关心某个特定描述符,那么可以把与它对应的pollfd结构的fd成员设置成一个负值。poll函数将忽略这样的pollfd结构的events成员,返回时将它们的revents成员的值置为0。
(二)poll的特点
1.最大并发数限制:poll的第二个参数nfds是第一个参数指示的结构数据的元素个数,这个nfds并没有select的限制,它只受限于系统的内存空间(可以达到系统所允许打开的最大描述符的个数,即65535)。
2.效率问题:效率和select类似。
3.内核/用户空间内存拷贝问题:和select类似。
4.事件集:poll比select要“聪明”,它将描述符和事件定义在一起,任何事件都被统一处理,编程接口简洁许多。
一方面,poll可以监听的事件类型就可以更细分为很多种。
另一方面,而且内核每次修改的是pollfd结构体的revents成员,而events成员不变,因此下次重新调用poll无需重置pollfd类型中的事件集参数(避免了类似于select使用的的“值-结果”参数)。
5.poll的定时也是由函数的最后一个参数给出,但是它是一个int类型(指定函数要等待的毫秒数),而不是timeval结构体。
五、epoll函数
(一)epoll类的三个函数
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
1.int epoll_create(int size);
创建一个epoll的句柄,之后的所有操作将通过这个句柄来进行操作。size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数那样给出最大监听的fd+1的值。自从linux2.6.8之后,size参数是被忽略的,只要它比0大就可以。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
2.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
3.int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
收集在epoll监控的事件中已经发生的事件。参数events是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)。maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1是永久阻塞)。如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时。
(二)epoll的两种工作模式:LT和ET
epoll有Level-Triggered和Edge-Triggered两种工作模式。
1.ET
Edge-Triggered是只支持非阻塞模式。当一个新的事件到达时,ET模式从epoll_wait调用中获取该事件,如果这次没有将该事件对应的套接字缓冲区处理完,在这个套接字中没有新的事件再次到来时,在ET模式下是无法再次从epoll_wait调用中获取这个事件的。也就是说,ET模式下,当有事件发生时,系统只会通知你一次,也就是调用epoll_wait 返回fd后,不管事件你处理与否,或者处理完全与否,再调用epoll_wait 时,都不会再返回该fd,这样程序员要自己保证在事件发生时及时有效的处理完。比如此时fd发生了EPOLLIN事件,在调用epoll_wait 后发现此事件,程序员要保证在本次轮询中对此fd进行了读操作,并且还要循环调用recv操作,一直读到recv的返回值小于请求值,或者遇到EAGAIN错误,不然下次轮询时,如果此fd没有再次触发事件,你就没有机会知道这个fd需要你的处理。这样无形中就增加了程序员的负担和出错的机会。
所以ET模式下被唤醒(返回就绪)的条件为:
(1)对于读操作:
- 当buffer由不可读状态变为可读的时候,即由空变为不空的时候。
- 当有新数据到达时,即buffer中的待读内容变多的时候。
- 当buffer中有数据可读(即buffer不空)且用户对相应fd进行EPOLL_CTL_MOD成EPOLLIN事件时。
相关验证实验可以看博客:http://blog.chinaunix.net/uid-28541347-id-4288802.html
(2)对于写操作:
- 当buffer由不可写变为可写的时候,即由满状态变为不满状态的时候。
- 当有旧数据被发送走时,即buffer中待写的内容变少的时候。
- 当buffer中有可写空间(即buffer不满)且用户对相应fd进行EPOLL_CTL_MOD成EPOLLOUT事件时。
相关验证实验可以看博客:http://blog.chinaunix.net/uid-28541347-id-4296180.html
2.LT
Level-Triggered是缺省工作方式,有阻塞和非阻塞两种方式。内核告诉你一个描述符是否就绪,然后可以对就绪的fd进行IO操作。如果不做任何操作,内核还会继续通知,因此该模式下编程出错可能性小。传统的select/poll是这样的模型。此方式可以认为是一个快速的poll。
所以LT模式下被唤醒(返回就绪)的条件除了包含ET模式的所有条件,还有以下:
(1)对于读操作:
- 当buffer中有数据,且数据被读出一部分后buffer还不空的时候,即buffer中的内容减少的时候,LT模式返回读就绪。
(2)对于写操作:
- 当buffer不满,又写了一部分数据后仍然不满的时候,即由于写操作的速度大于发送速度造成buffer中的内容增多的时候,LT模式返回写就绪。
二者的差异在于 level-trigger 模式下只要某个 fd 处于 readable/writable 状态,无论什么时候进行 epoll_wait 都会返回该 fd;而 edge-trigger 模式下只有某个 fd 从unreadable 变为 readable 或从 unwritable 变为 writable 时,epoll_wait 才会返回该 fd。
LT:水平触发,效率会低于ET触发,尤其在大并发,大流量的情况下。但是LT对代码编写要求比较低,不容易出现问题。LT模式服务编写上的表现是:只要有数据没有被获取,内核就不断通知你,因此不用担心事件丢失的情况。
ET:边缘触发,效率非常高,在并发,大流量的情况下,会比LT少很多epoll的系统调用,因此效率高。但是对编程要求高,需要细致的处理每个请求,否则容易发生丢失事件的情况。
(三)epoll的特点
(1)epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目,这个数字一般远大于 2048, 一般来说这个数目和系统内存关系很大 ,具体值可以 cat /proc/sys/fs/file-max[599534] 查看。
(2)效率提升, Epoll最大的优点就在于它基于事件的就绪通知方式:只管“活跃”的连接 ,而跟连接总数无关,其算法复杂度为O(1),因此在实际的网络环境中,epoll的效率就会远远高于 select 和 poll 。select和poll都是轮询方式的,每次调用要扫描整个注册文件描述符集合,并将其中就绪描述符返回给用户程序。epoll_wait采用的是回调方式,内核检测到就绪描述符时,将触发回调函数,回调函数就将该文件描述符上对应的事件插入内核就绪队列,不需要轮询。
(3)内存拷贝, Epoll 在这点上使用了“共享内存“,因此没有内存拷贝的开销。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
epoll与select、poll的区别如下:
六、关于ET模式下的读写
在epoll的ET模式下,正确的读写方式为:
- 读:只要可读(即buffer中还有数据),就一直读,直到返回0,或者 errno = EAGAIN
-
写:只要可写(即buffer还有空间且用户请求写的数据还未写完),就一直写,直到数据发送完。
注意,对于读操作,errno==EAGAIN表示读缓冲队列已空。对于写操作,errno==EAGAIN表示写缓冲队列已满,但是不一定说明用户要求写的数据已经写完。
正确的读操作代码为:
n = 0;
memset(buf, 0, sizeof(buf));
while ((nread = read(sockfd, buf + n, MAXLINE - 1)) > 0) {
n += nread;
}
if (nread == -1 && errno != EAGAIN) { //errno==EAGAIN表示缓冲队列已空
perror("read error\n");
exit(1);
}
正确的写操作代码为:
ssize_t data_size = strlen(buf);
n = data_size;
while (true) {
nwrite = write(sockfd, buf + data_size - n, n);
if (nwrite < 0) {
//当socket是非阻塞时,如返回此错误,表示写缓冲队列已满
//在这里做延时后再重试
if (errno == EAGAIN) {
usleep(1000);
continue;
}else {
perror("write error\n");
exit(1);
}
} else if (nwrite == n) { //表示写完了
break;
} else { //还没写完
n -= nwrite;
}
}
七、关于accept
(一)当服务器端使用IO多路复用(即select、poll、epoll)时,accept应工作在非阻塞模式
1.原因:
如果accept工作在阻塞模式,考虑这种情况: TCP 连接被客户端夭折,即在服务器调用 accept 之前(此时select等已经返回连接到达读就绪),客户端主动发送 RST 终止连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞在 accept 调用上,直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在accept 调用上(实际应该阻塞在select上),就绪队列中的其他描述符都得不到处理。
2.解决办法:
把监听套接口设置为非阻塞, 当客户在服务器调用 accept 之前中止某个连接时,accept 调用可以立即返回 -1。
(二)ET模式下的accept操作
考虑这种情况:多个连接同时到达,服务器的 TCP 就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll 只会通知一次,accept 只处理一个连接,导致 TCP 就绪队列中剩下的连接都得不到处理。
解决办法是用 while 循环抱住 accept 调用,处理完 TCP 就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢? accept 返回 -1 并且 errno 设置为 EAGAIN 就表示所有连接都处理完。
比如如下代码:
for (; ;) {
socklen = sizeof(cliaddr);
if ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &socklen)) < 0) {
//因为之前已经设置listenfd为非阻塞,所以当accept从已完成连接队列里取出所有连接后,
//会返回EAGAIN错误表示accept处理完了,否则只是返回-1表示连接出错
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
} else {
perror("accept error\n");
exit(1);
}
} else {
cout << "connection from " << inet_ntop(AF_INET, &cliaddr.sin_addr, buf, sizeof(cliaddr))
<< " ,port " << ntohs(cliaddr.sin_port) << endl;
//将连接套接字注册进epoll事件
set_nonblocking(connfd);
ev.data.fd = connfd;
ev.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev) < 0) {
perror("epoll_ctl error\n");
exit(1);
}
}
}
八、简单的客户端服务器例子
这个例子出自《UNIX网络编程》这本书,是执行如下步骤的一个回射服务器:
(1)客户从标准输入读入一行文本,并写给服务器。
(2)服务器从网络输入读入这行文本,并回射给客户。
(3)客户从网络输入读入这行回射文本,并显示在标准输出上。
(一)客户端
//
// Created by huxijie on 17-4-25.
// TCP回射客户端
#include <iostream>
#include <unistd.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <stdlib.h>
using namespace std;
const static int MAXLINE = 1024;
ssize_t Read(int fd,void *ptr,size_t nbytes) {
ssize_t n;
if ((n = read(fd, ptr, nbytes)) == -1) {
perror("read error\n");
}
return n;
}
void Write(int fd,void *ptr,size_t nbytes) {
if (write(fd, ptr, nbytes) != nbytes) {
perror("write error\n");
return;
}
}
//TCP回射客户程序
void str_cli(FILE *fp,int sockfd) {
int maxfdpl,stdineof;
fd_set rset;
char sendline[MAXLINE], recvline[MAXLINE];
int n;
stdineof = 0; //标志,只要该值为0,则每次在主循环中select标准输入的可读性
FD_ZERO(&rset);
for (; ;) {
if (stdineof == 0) {
FD_SET(fileno(fp), &rset);
}
FD_SET(sockfd, &rset);
maxfdpl = max(fileno(fp), sockfd) + 1;
if (select(maxfdpl, &rset, NULL, NULL, NULL) < 0) {
perror("select error\n");
return;
}
if (FD_ISSET(fileno(fp), &rset)) {
//当我们在标准输入上碰到EOF时,把新标志stdineof置为1,
//并调用shutdown来发送FIN,从fd_set中移除fp
if ((n = Read(fileno(fp), sendline, MAXLINE)) == 0) {
stdineof = 1;
shutdown(sockfd, SHUT_WR);
FD_CLR(fileno(fp), &rset);
continue;
}
Write(sockfd, sendline, n);
}
if (FD_ISSET(sockfd, &rset)) {
if ((n = Read(sockfd, recvline, MAXLINE)) == 0) {
if (1 == stdineof) {
return;
} else {
perror("str_cli:server terminated permaturely\n");
return;
}
}
Write(fileno(stdout), recvline, n);
}
}
}
int main(int argc,char **argv) {
int sockfd;
struct sockaddr_in servaddr;
const int PORT = 8080;
const char *IP = "127.0.0.1";
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket error\n");
}
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(PORT);
if (inet_pton(AF_INET, IP, &servaddr.sin_addr) <= 0) {
perror("inet_pton error\n");
}
if (connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
perror("connect error\n");
}
str_cli(stdin, sockfd);
exit(0);
}
(二)服务器
1.用多进程实现并发编程
//
// Created by huxijie on 17-4-25.
// TCP回射服务器程序
#include <iostream>
#include <unistd.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
using namespace std;
const char *IP = "127.0.0.1";
const int PORT = 8080;
const static int MAXLINE = 1024;
const static int LISTENQ = 1024;
ssize_t writen(int fd,const void *vptr,size_t n) {
size_t nleft;
ssize_t nwriten;
const char *ptr;
nleft = n;
ptr = (const char *) vptr;
while (nleft > 0) {
if ((nwriten = write(fd, ptr, nleft)) <= 0) {
if (nwriten < 0 && errno == EINTR) {
nwriten = 0;
} else {
return -1;
}
}
nleft -= nwriten;
ptr += nwriten;
}
return n;
}
//在套接字上回射数据
void str_echo(int sockfd) {
ssize_t n;
char buf[MAXLINE];
again:
while ((n = read(sockfd, buf, MAXLINE)) > 0) {
writen(sockfd, buf, n);
}
if (n < 0 && errno == EINTR) {
goto again;
} else if (n < 0) {
perror("str_echo:read error\n");
}
}
typedef void Sigfunc(int);
Sigfunc *my_signal(int signo,Sigfunc *func) {
struct sigaction act, oact;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_handler = func;
act.sa_flags |= SA_RESTART;
if (sigaction(signo, &act, & oact) < 0) {
return SIG_ERR;
} else {
return oact.sa_handler;
}
}
void sig_chld(int signo) {
pid_t pid;
int stat;
while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
cout << "child " << pid << " termined" << endl;
}
return;
}
//多进程
int main(int argc,char **argv) {
int listenfd, connfd;
struct sockaddr_in servaddr, cliaddr;
socklen_t socklen = sizeof(servaddr);
pid_t pid;
char buff[MAXLINE];
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket error\n");
}
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(PORT);
if (inet_pton(AF_INET, IP, &servaddr.sin_addr) <= 0) {
perror("inet_pton error\n");
}
if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
perror("bind error\n");
}
if (listen(listenfd, LISTENQ) < 0) {
perror("listen error\n");
}
my_signal(SIGCHLD, sig_chld);
while (true) {
if ((connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &socklen)) < 0) {
if (errno == EINTR) {
continue;
} else {
perror("accept error\n");
}
}
if ((pid = fork()) == 0) {
if (close(listenfd) < 0) {
perror("close error\n");
}
cout << "connection from " << inet_ntop(AF_INET, &cliaddr.sin_addr, buff, MAXLINE)
<< " , port " << ntohs(cliaddr.sin_port) << endl;
str_echo(connfd);
if (close(connfd) < 0) {
perror("close error\n");
}
_exit(0);
}
if (close(connfd) < 0) {
perror("close error\n");
}
}
}
2.使用select
//
// Created by huxijie on 17-4-25.
// TCP回射服务器程序
#include <iostream>
#include <unistd.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
using namespace std;
const static int MAXLINE = 1024;
const static int LISTENQ = 1024;
const char *IP = "127.0.0.1";
const int PORT = 8080;
ssize_t Read(int fd,void *ptr,size_t nbytes) {
ssize_t n;
if ((n = read(fd, ptr, nbytes)) == -1) {
perror("read error\n");
}
return n;
}
void Write(int fd,void *ptr,size_t nbytes) {
if (write(fd, ptr, nbytes) != nbytes) {
perror("write error\n");
return;
}
}
//将套接字设置为非阻塞方式
void set_nonblocking(int sockfd) {
int opts;
if ((opts = fcntl(sockfd, F_GETFL)) < 0) {
perror("fcntl(F_GETFL) error\n");
exit(1);
}
opts |= O_NONBLOCK;
if (fcntl(sockfd, F_SETFL, opts) < 0) {
perror("fcntl(F_SETFL) error\n");
exit(1);
}
}
//使用select
int main(int argc,char **argv) {
int i, maxi, maxfd, listenfd, connfd, sockfd;
int nready, client[FD_SETSIZE];
ssize_t n;
fd_set rset, allset;
char buf[MAXLINE];
socklen_t clilen;
struct sockaddr_in cliaddr, servaddr;
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket error\n");
return -1;
}
set_nonblocking(listenfd);
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(PORT);
if (inet_pton(AF_INET, IP, &servaddr.sin_addr) <= 0) {
perror("inet_pton error\n");
return -1;
}
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) {
perror("bind error\n");
return -1;
}
if (listen(listenfd, LISTENQ) < 0) {
perror("listen error\n");
return -1;
}
maxfd = listenfd;
maxi = -1;
for (i = 0; i < FD_SETSIZE; i++) {
client[i] = -1;
}
FD_ZERO(&allset);
FD_SET(listenfd,&allset);
while (true) {
rset = allset;
if ((nready = select(maxfd + 1, &rset, NULL, NULL, NULL)) < 0) {
perror("select error\n");
return -1;
}
if (FD_ISSET(listenfd, &rset)) { //新客户端连接请求
clilen = sizeof(cliaddr);
if ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &clilen)) < 0) {
perror("accept error\n");
return -1;
}
cout << "connection from " << inet_ntop(AF_INET, &cliaddr.sin_addr, buf, sizeof(buf))
<< ", port " << ntohs(cliaddr.sin_port) << endl;
for (i = 0; i < FD_SETSIZE; ++i) {
if (client[i] < 0) {
client[i] = connfd; //保存新的描述符
break;
}
}
if (i == FD_SETSIZE) {
perror("too many clients\n");
return -1;
}
FD_SET(connfd,&allset); //加入新的描述符到集合中
if (connfd > maxfd) {
maxfd = connfd;
}
if (i > maxi) {
maxi = i; //在client[]数组中最大下标
}
if (--nready <= 0) {
continue; //若就绪数目减1后变为0,则直接跳过后续操作进入下一次while中
}
}
for (i = 0; i <= maxi; ++i) { //检查所有已连接描述符是否可读
if ((sockfd = client[i]) < 0) {
continue;
}
if (FD_ISSET(sockfd, &rset)) {
if ((n = Read(sockfd, buf, MAXLINE)) == 0) {
if (close(sockfd) < 0) {
perror("close error\n");
return -1;
}
FD_CLR(sockfd,&allset); //客户关闭了连接,所以要从集合中去掉本连接描述符
client[i] = -1;
} else {
Write(sockfd, buf, n);
}
if (--nready <= 0) {
break; //若就绪数目减1后变为0,则直接跳过后续操作进入下一次while中
}
}
}
}
}
3.使用poll
和上面使用select的程序不同的只是main函数。
//使用poll
int main(int argc,char **argv) {
int i, maxi, listenfd, connfd, sockfd;
int nready;
ssize_t n;
char buf[MAXLINE];
socklen_t socklen;
struct sockaddr_in cliaddr, servaddr;
const int OPEN_MAX = sysconf(_SC_OPEN_MAX);
struct pollfd client[OPEN_MAX];
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket error\n");
return -1;
}
set_nonblocking(listenfd);
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(PORT);
if (inet_pton(AF_INET, IP, &servaddr.sin_addr) < 0) {
perror("inet_pton error\n");
return -1;
}
if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
perror("bind error\n");
return -1;
}
if (listen(listenfd, LISTENQ) < 0) {
perror("listen error\n");
return -1;
}
client[0].fd = listenfd;
client[0].events = POLLRDNORM;
for (i = 1; i < OPEN_MAX; ++i) {
client[i].fd = -1;
}
maxi = 0;
while (true) {
if ((nready = poll(client, maxi + 1, -1)) < 0) {
perror("poll error\n");
return -1;
}
if (client[0].revents & POLLRDNORM) {
socklen = sizeof(cliaddr);
if ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &socklen)) < 0) {
perror("accept error\n");
return -1;
}
cout << "connection from " << inet_ntop(AF_INET, &cliaddr.sin_addr, buf, sizeof(cliaddr))
<< " ,port " << ntohs(cliaddr.sin_port) << endl;
for (i = 1; i < OPEN_MAX; ++i) {
if (client[i].fd < 0) {
client[i].fd = connfd;
break;
}
}
if (i == OPEN_MAX) {
perror("too many clients\n");
return -1;
}
client[i].events = POLLRDNORM;
if (i > maxi) {
maxi = i;
}
if (--nready <= 0) {
continue;
}
}
for (i = 1; i <= maxi; ++i) {
if ((sockfd = client[i].fd) < 0) {
continue;
}
if (client[i].revents & (POLLRDNORM | POLLERR)) {
if ((n = read(sockfd, buf, MAXLINE-1)) < 0) {
if (errno == ECONNRESET) {
if (close(sockfd) < 0) {
perror("close error\n");
return -1;
}
client[i].fd = -1;
} else {
perror("read error\n");
}
} else if (0 == n) {
if (close(sockfd) < 0) {
perror("close error\n");
return -1;
}
client[i].fd = -1;
} else {
Write(sockfd, buf, n);
}
if (--nready <= 0) {
break;
}
}
}
}
}
4.使用epoll
//将套接字设置为非阻塞方式
void set_nonblocking(int sockfd) {
int opts;
if ((opts = fcntl(sockfd, F_GETFL)) < 0) {
perror("fcntl(F_GETFL) error\n");
exit(1);
}
opts |= O_NONBLOCK;
if (fcntl(sockfd, F_SETFL, opts) < 0) {
perror("fcntl(F_SETFL) error\n");
exit(1);
}
}
//使用epoll
int main(int argc,char **argv) {
const int OPEN_MAX = 1024;
const int MAXEVENTS = 64;
int epfd,listenfd,connfd,sockfd;
ssize_t n,nread,nwrite;
sockaddr_in servaddr, cliaddr;
socklen_t socklen;
struct epoll_event ev,events[MAXEVENTS]; //ev用于注册事件,数组用于回传要处理的事件
char buf[MAXLINE];
//创建listenfd监听套接字
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket error\n");
exit(1);
}
//设置非阻塞
set_nonblocking(listenfd);
//创建一个epoll的句柄,该句柄占用一个fd值,所以epoll使用完后要关闭
if ((epfd = epoll_create(MAXEVENTS)) < 0) {
perror("epoll_create error\n");
exit(1);
}
//设置与要处理的事件相关的文件描述符
ev.data.fd = listenfd;
//设置要处理的事件类型
ev.events = EPOLLIN | EPOLLET;
//注册epoll事件
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) < 0) {
perror("epoll_ctl error\n");
exit(1);
}
//设置servaddr
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(PORT);
if (inet_pton(AF_INET, IP, &servaddr.sin_addr) < 0) {
perror("inet_pton error\n");
exit(1);
}
//绑定
if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
perror("bind error\n");
exit(1);
}
//监听
if (listen(listenfd, LISTENQ) < 0) {
perror("listen error\n");
exit(1);
}
int nfds;
for (; ;) {
//收集发生的事件,nfds是已经准备好的描述符个数
//下面的for循环只扫描已经准备好的描述符,这正是epoll比poll的高效之处
//定时器时间设置为-1表示一直等待直到有事件就绪
if ((nfds = epoll_wait(epfd, events, MAXEVENTS, -1)) < 0) {
perror("epoll_wait error\n");
exit(1);
}
for (int i = 0; i < nfds; ++i) {
sockfd = events[i].data.fd;
if (sockfd == listenfd) {
for (; ;) {
socklen = sizeof(cliaddr);
if ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &socklen)) < 0) {
//因为之前已经设置listenfd为非阻塞,所以当accept从已完成连接队列里取出所有连接后,
//会返回EAGAIN错误表示accept处理完了,否则只是返回-1表示连接出错
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
} else {
perror("accept error\n");
exit(1);
}
} else {
cout << "connection from " << inet_ntop(AF_INET, &cliaddr.sin_addr, buf, sizeof(cliaddr))
<< " ,port " << ntohs(cliaddr.sin_port) << endl;
//将连接套接字注册进epoll事件
set_nonblocking(connfd);
ev.data.fd = connfd;
ev.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev) < 0) {
perror("epoll_ctl error\n");
exit(1);
}
}
}
continue;
} else {
if (events[i].events & EPOLLIN) {
n = 0;
memset(buf, 0, sizeof(buf));
while ((nread = read(sockfd, buf + n, MAXLINE - 1)) > 0) {
n += nread;
}
if (nread == -1 && errno != EAGAIN) { //errno==EAGAIN表示缓冲队列已空
perror("read error\n");
exit(1);
}
//缓冲队列中的数据已经读取完毕,所以更新该fd为EPOLLOUT
ev.data.fd = sockfd;
ev.events = events[i].events | EPOLLOUT;
if (epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev) < 0) {
perror("epoll_ctl error\n");
exit(1);
}
}
if (events[i].events & EPOLLOUT) {
ssize_t data_size = strlen(buf);
n = data_size;
while (true) {
nwrite = write(sockfd, buf + data_size - n, n);
if (nwrite < 0) {
//当socket是非阻塞时,如返回此错误,表示写缓冲队列已满
//在这里做延时后再重试
if (errno == EAGAIN) {
usleep(1000);
continue;
}else {
perror("write error\n");
exit(1);
}
} else if (nwrite == n) { //表示写完了
break;
} else { //还没写完
n -= nwrite;
}
}
//数据发送完毕,更新该fd为EPOLLIN
ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev) < 0) {
perror("epoll_ctl error\n");
exit(1);
}
}
}
}
}
}
以上4种方式所编写的程序实现的效果都是一样的。
先后运行服务器和客户端:
在客户端里输入一行字符串后,会回显该字符串:
当打开另一个客户端时:
在新的客户端里输入一行字符串:
参考:
《UNIX网络编程》卷1:套接字联网API
知乎:https://www.zhihu.com/question/19732473
historyasamirror:http://blog.csdn.net/historyasamirror/article/details/5778378
Sunshine_top:http://blog.csdn.net/u013074465/article/details/44964835
ljx0305:http://blog.csdn.net/ljx0305/article/details/4065058
mirkerson:http://blog.csdn.net/mirkerson/article/details/7468347
shuxiaogd:http://blog.csdn.net/shuxiaogd/article/details/50366039
zhang_shuai_2011:http://blog.csdn.net/zhang_shuai_2011/article/details/7675797
lvyilong316:http://blog.chinaunix.net/uid/28541347/cid-191916-list-3.html