欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

epoll的简单使用

程序员文章站 2024-03-23 10:10:28
...

epoll的功能:实现I/O复用,即多路I/O。

一、epoll的系统调用函数

epoll只有epoll_create,epoll_ctl 和 epoll_wait 3个系统调用函数

1、epoll_create(int size)

创建一个epoll的句柄,size表示内核监听的数目量。

注意:当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

函数声明:epoll_create(int size);

注释:生成一个epoll的专用描述符,其实是在内核中申请一块空间,用来监听想要监听的socket fd上发生了什么事件。size是epoll监听的最大socket fd数目。

2、epoll_ctl (int epfd, int op, int fd, struct epoll_event *event)

将被监听的描述符添加到epoll句柄或从epool句柄中删除或者对监听事件进行修改。该函数用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。
函数声明:epoll_ctl (int epfd, int op, int fd, struct epoll_event *event);

参数:
epfd:由 epoll_create 生成的epoll专用的文件描述符;
op:要进行的操作例如注册事件,可能的取值EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 删除;
fd:关联的文件描述符;
event:指向epoll_event的指针,即在内核中需要监听什么事件。

返回值:成功返回0,失败返回1。

// structepoll_event 的结构体:
typedef union epoll_data 
{  
  void *ptr;  
  int fd;  
  __uint32_t u32;  
  __uint64_t u64;  
} epoll_data_t;  
struct epoll_event 
{
  __uint32_t events; /* Epoll events */  
  epoll_data_t data; /* User data variable */  
};

events可以是以下几个宏的集合:

  1. EPOLLIN: 触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);
  2. EPOLLOUT: 触发该事件,表示对应的文件描述符上可以写数据;
  3. EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  4. EPOLLERR: 表示对应的文件描述符发生错误;
  5. EPOLLHUP: 表示对应的文件描述符被挂断;
  6. EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  7. EPOLLONESHOT: 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
// 案例:
//定义epoll_event事件的变量
struct epoll_event ev;
//设置与要处理的事件相关的文件描述符
ev.data.fd=listenfd;
//设置要处理的事件类型
ev.events=EPOLLIN|EPOLLET;
//注册epoll事件
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);

3、int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

等待事件的发生。参数events用来从内核得到事件的集合,maxevents表示内核events有多大(数组成员的个数),这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。
函数声明:int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数:
epfd:由epoll_create 生成的epoll专用的文件描述符;
epoll_event:用于回传代处理事件的数组;
maxevents:每次能处理的事件数;
timeout:等待I/O事件发生的超时值(单位我也不太清楚);-1相当于阻塞,0相当于非阻塞。一般用-1即可返回发生事件数。

返回值:成功返回处理事件的数目,失败返回0,表示已超时;

二、epoll的两种模式ET和LT

假如有这样一个例子:(LT方式,即默认方式下,内核会继续通知,可以读数据,ET方式,内核不会再通知,可以读数据)

  1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符
  2. 这个时候从管道的另一端被写入了2KB的数据
  3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作
  4. 然后我们读取了1KB的数据
  5. 调用epoll_wait(2)......
Edge Triggered工作模式:

如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候ET工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。在上面的例子中,会有一个事件产生在RFD句柄上,因为在第2步执行了一个写操作,然后,事件将会在第3步被销毁。因为第4步的读取操作没有读空文件输入缓冲区内的数据,因此我们在第5步调用epoll_wait(2)完成后,是否挂起是不确定的。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。(LT方式可以解决这种缺陷)
i 基于非阻塞文件句柄
ii 只有当read(2)或者write(2)返回EAGAIN时(认为读完)才需要挂起,等待。但这并不是说每次read()时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read()返回的读到的数据长度小于请求的数据长度时(即小于sizeof(buf)),就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。

Level Triggered工作模式(默认的工作方式)

以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll(2),并且无论后面的数据是否被使用,因此他们具有同样的职能。因为即使使用ET模式的epoll,在收到多个chunk的数据的时候仍然会产生多个事件。调用者可以设定EPOLLONESHOT标志,在epoll_wait(2)收到事件后epoll会与事件关联的文件句柄从epoll描述符中禁止掉。因此当EPOLLONESHOT设定后,使用带有EPOLL_CTL_MOD标志的epoll_ctl(2)处理文件句柄就成为调用者必须作的事情。

ET和LT的区别:
  1. LT(leveltriggered)是缺省的工作方式,并且同时支持block和no-blocksocket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。
  1. ET(edge-triggered)是高速工作方式,只支持no-blocksocket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了。
// epoll的常规用法
 for(;;)
 {  
           nfds = epoll_wait(kdpfd, events, maxevents, -1);  
           for(n = 0; n < nfds; ++n) 
          {  
               if(events[n].data.fd == listenfd) //有新的连接
               {  
                   client = accept(listener, (struct sockaddr *) &local,  
                                   &addrlen);   //accept这个连接
                   if(client < 0)
                  {  
                       perror("accept");  
                       continue;  
                   }  
                   setnonblocking(client);  
                   ev.events = EPOLLIN | EPOLLET;  
                   ev.data.fd = client;  
                   //将新的fd添加到epoll的监听队列中
                   if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) 
                  {  
                       fprintf(stderr, "epoll set insertion error: fd=%d\n",  
                               client);  
                       return -1;  
                   }  
               }  
               else if( events[i].events&EPOLLIN ) //接收到数据,读socket
               {
                    n = read(sockfd, line, MAXLINE)) < 0    //读
                    ev.data.ptr = md;     //md为自定义类型,添加数据
                    ev.events=EPOLLOUT|EPOLLET;
                    epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
                }
                else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
                {
                    struct myepoll_data* md = (myepoll_data*)events[i].data.ptr;    //取数据
                    sockfd = md->fd;
                    send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据  
                    ev.data.fd=sockfd;
                    ev.events=EPOLLIN|EPOLLET;
                    epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
                }
                else  
                {
                    // 其他处理; 
                }
           }  
}  
// 案例:
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
using namespace std;
#define MAXLINE 5
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5000
#define INFTIM 1000

void set_non_blocking(int sock) // 非阻塞
{
    int opts;
    opts = fcntl(sock, F_GETFL);
    if (opts < 0)
    {
        perror("fcntl(sock, GETFL)");
        exit(1);
    }
    opts = opts|O_NONBLOCK;
    if (fcntl(sock, F_SETFL, opts) < 0)
    {
        perror("fcntl(sock, SETFL, opts)");
        exit(1);
    }
}

int main(int argc, char* argv[])
{
    int i, maxi, listenfd, connfd, sockfd, epfd, nfds, portnumber;
    ssize_t n;
    char line[MAXLINE];
    socklen_t clilen;

    if ( 2 == argc )
    {
        if((portnumber = atoi(argv[1])) < 0 )
        {
            fprintf(stderr, "Usage:%s portnumber/a/n", argv[0]);
            return 1;
        }
    }
    else
    {
        fprintf(stderr, "Usage:%s portnumber/a/n", argv[0]);
        return 1;
    }

    //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
    struct epoll_event ev, events[20];
    //生成用于处理accept的epoll专用的文件描述符
    epfd=epoll_create(256);
    struct sockaddr_in clientaddr;
    struct sockaddr_in serveraddr;
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    //把socket设置为非阻塞方式
    //setnonblocking(listenfd);
    //设置与要处理的事件相关的文件描述符
    ev.data.fd = listenfd;
    //设置要处理的事件类型
    ev.events = EPOLLIN|EPOLLET;
    //ev.events = EPOLLIN;
    //注册epoll事件
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
    bzero(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    char *local_addr = "127.0.0.1";
    inet_aton(local_addr, &(serveraddr.sin_addr)); //htons(portnumber);
    serveraddr.sin_port = htons(portnumber);
    bind(listenfd, (sockaddr *)&serveraddr, sizeof(serveraddr));
    listen(listenfd, LISTENQ);
    maxi = 0;

    for ( ; ; ) 
    {
        //等待epoll事件的发生
        nfds = epoll_wait(epfd,events,20,500);
        //处理所发生的所有事件
        for(i=0; i<nfds; ++i)
        {
            if (events[i].data.fd == listenfd)//如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。
            {
                connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen);
                if(connfd<0){
                    perror("connfd<0");
                    exit(1);
                }
                //setnonblocking(connfd);
                char *str = inet_ntoa(clientaddr.sin_addr);
                cout << "accapt a connection from " << str << endl;
                //设置用于读操作的文件描述符
                ev.data.fd=connfd;
                //设置用于注测的读操作事件
                ev.events=EPOLLIN|EPOLLET;
                //ev.events=EPOLLIN;
                //注册ev
                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
            }
            //如果是已经连接的用户,并且收到数据,那么进行读入。
            else if (events[i].events&EPOLLIN) 
            {
                cout << "EPOLLIN" << endl;
                if ((sockfd = events[i].data.fd) < 0)
                    continue;
                if ( (n = read(sockfd, line, MAXLINE)) < 0) 
                {
                    if (errno == ECONNRESET) 
                    {
                        close(sockfd);
                        events[i].data.fd = -1;
                    } 
                    else
                    {
                        std::cout << "readline error" << std::endl;
                    }
                } 
                else if (n == 0) 
                {
                    close(sockfd);
                    events[i].data.fd = -1;
                }
                line[n] = '/0';
                std::cout << "read " << line << std::endl;
                //设置用于写操作的文件描述符
                ev.data.fd = sockfd;
                //设置用于注测的写操作事件
                ev.events = EPOLLOUT|EPOLLET;
                //修改sockfd上要处理的事件为EPOLLOUT
                //epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd,&ev);
            }
            else if (events[i].events&EPOLLOUT)  //如果有数据发送
            {
                sockfd = events[i].data.fd;
                write(sockfd, line, n);
                //设置用于读操作的文件描述符
                ev.data.fd=sockfd;
                //设置用于注测的读操作事件
                ev.events=EPOLLIN|EPOLLET;
                //修改sockfd上要处理的事件为EPOLIN
                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
            }
        }
    }
    return 0;
}