欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

epoll 学习服务器的简单实现-原始epoll结构

程序员文章站 2022-03-14 12:20:55
...

1.begins~

学习linux编程很久,只知道网络编程是socket,bind, listen。。。,然而这些都是网络通信软件最基本的接口。在某网络公司待了y,也了解到公司的基础就是网络转发 ,然而网络转发实现并非我们平时所见的简单的send,recv。公司设备的转发都是建立在稳定并且高效的内部业务的基础上的,例如一个可靠性的服务:BFD(链路双向快速检测),进程内业务就是建立在内核与用户态,后台处理与前台配置交互的过程上实现。在做项目的日子里,渐渐的对公司BFD模块有一定了解,特别是涉及到框架的地方,仔细看发现,一些技巧其实就是我们平日所用的技能外加一些优化。对于单进程用户态来说,处理前台进程交互与内核上送消息,如果没有一个高效的机制,是很难处理上以千计甚至万计的消息的。在参看公司代码的初始化阶段,发现一共用了至少3 个epoll回调机制,一个是处理前台发送的配置及查询,一个是处理与其他应用模块的交互,另一个是想其他应用通报状态变化的机制。公司的epoll_event 机制不知道是不是修改了内核,结构大致是这样:



typedef  struct epoll_event {  
    uint32_t events;      // Epoll events  
    epoll_data_t data;      // User data variable 
    ep_callback  callback;    /* 这跟linux内核不一样 ? */
}EPOLL_PACK; 

/*
在添加epoll事件的时候就多了一个回调函数注册,这个十分方便了,回调函数可以用一个全局变量搞定,一个框架搭好,随后添加就大大的简单多了
*/

实际内核提供给我们使用的用户态结构是这样:

typedef union epoll_data {  
    void *ptr;  
    int fd;  
    uint32_t u32;  
    uint64_t u64;  
} epoll_data_t;

struct epoll_event {  
    uint32_t events;      // Epoll events  
    epoll_data_t data;      // User data variable  
};  

不过我们也可以将epoll_data_t这个联合体封装在一个结构,只需要用指针*ptr指向封装的带fd及回调函数就可以。打算自己学习一些必要的技能,暂且就记录一下吧

2.epoll API

extern int  epoll_create(int  size);
extern int  epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
extern int  epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

① int epoll_create(int size);
创建一个epoll的句柄。自从linux2.6.8之后,size参数是被忽略的。需要注意的是,当创建好epoll句柄后,
它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll
后,必须调用close()关闭,否则可能导致fd被耗尽。
②int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先
注册要监听的事件类型。 
参数:
第一个参数是epoll_create()的返回值。 
第二个参数表示动作,用三个宏来表示: 
EPOLL_CTL_ADD:注册新的fd到epfd中; 
EPOLL_CTL_MOD:修改已经注册的fd的监听事件; 
EPOLL_CTL_DEL:从epfd中删除一个fd; 
  
第三个参数是需要监听的fd。 
第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

enum EPOLL_EVENTS{

        EPOLLIN = 0x001,

#define EPOLLIN EPOLLIN

        EPOLLPRI = 0x002,

#define EPOLLPRI EPOLLPRI

        EPOLLOUT = 0x004,

#define EPOLLOUT EPOLLOUT

        EPOLLRDNORM = 0x040,

#define EPOLLRDNORM EPOLLRDNORM

        EPOLLRDBAND = 0x080,

#define EPOLLRDBAND EPOLLRDBAND

        EPOLLWRNORM = 0x100,

#define EPOLLWRNORM EPOLLWRNORM

        EPOLLWRBAND = 0x200,

#define EPOLLWRBAND EPOLLWRBAND

        EPOLLMSG = 0x400,

#define EPOLLMSG EPOLLMSG

        EPOLLERR = 0x008,

#define EPOLLERR EPOLLERR

        EPOLLHUP = 0x010,

#define EPOLLHUP EPOLLHUP

        EPOLLRDHUP = 0x2000,

#define EPOLLRDHUP EPOLLRDHUP

        EPOLLWAKEUP = 1u << 29,

#define EPOLLWAKEUP EPOLLWAKEUP

        EPOLLONESHOT = 1u << 30,

#define EPOLLONESHOT EPOLLONESHOT

        EPOLLET = 1u << 31

#define EPOLLET EPOLLET

};

(3)epoll_wait();

参数events是分配好的epoll_event结构体数组;
epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复制到)
参数timeout 是超时时间(毫秒,0表示立即返回,-1表示永久阻塞);
如果函数调用成功,返回对应IO上已经准备好的文件描述符数目;如果返回0,表示已超时;如果返回值小于0,表示函数失败。

3.epoll mode

epoll的使用也很方便,基本都会用到一下模型,取自linux帮助手册:


/*
Example for suggested usage
While  the  usage  of  epoll when employed as a level-triggered interface does have the same semantics as poll(2), the edge-triggered usage requires more
clarification to avoid stalls in the application event loop.  In this example, listener is a nonblocking socket on which listen(2) has been called.   The
function  do_use_fd()  uses the new ready file descriptor until EAGAIN is returned by either read(2) or write(2).  An event-driven state machine applica‐
tion should, after having received EAGAIN, record its current state so that at the next call to do_use_fd() it will continue to read(2) or write(2)  from
where it stopped before.*/

#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;

/* Code to set up listening socket, 'listen_sock',
  (socket(), bind(), listen()) omitted */

static Epoll_Case()
{

       

       epollfd = epoll_create1(0);
       if (epollfd == -1) {
           perror("epoll_create1");
           exit(EXIT_FAILURE);
       }

       ev.events = EPOLLIN;
       ev.data.fd = listen_sock;
       if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
           perror("epoll_ctl: listen_sock");
           exit(EXIT_FAILURE);
       }

       
       for (;;) {
                      nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
                      if (nfds == -1) {
                          perror("epoll_wait");
                          exit(EXIT_FAILURE);
                      }
       
                      for (n = 0; n < nfds; ++n) {
                          if (events[n].data.fd == listen_sock) {
                              conn_sock = accept(listen_sock,
                                              (struct sockaddr *) &local, &addrlen);
                              if (conn_sock == -1) {
                                  perror("accept");
                                  exit(EXIT_FAILURE);
                              }
                              setnonblocking(conn_sock);
                              ev.events = EPOLLIN | EPOLLET;
                              ev.data.fd = conn_sock;
                              if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
                                          &ev) == -1) {
                                  perror("epoll_ctl: conn_sock");
                                  exit(EXIT_FAILURE);
                              }
                          } else {
                              do_use_fd(events[n].data.fd);
                          }
                      }
                  }
}

/*           
When  used  as an edge-triggered interface, for performance reasons, it is possible to add the file descriptor inside the epoll interface (EPOLL_CTL_ADD)
once by specifying (EPOLLIN|EPOLLOUT).  This allows you  to  avoid  continuously  switching  between  EPOLLIN  and  EPOLLOUT  calling  epoll_ctl(2)  with
EPOLL_CTL_MOD.
*/

下面给出一个epoll 服务器的简单例子,参考了网上不少方式,自己写了一个,算不上原创,把BIT_TEST部分注释打开,可以忽略basetype.h,可以编译通过。创建socket的地方可以自己优化省略,我实现的时候觉得这种方式比较容易看清楚socket的获取方式。

#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <poll.h>
#include <assert.h>
#include <unistd.h>
#include <fcntl.h>


#include "basetype.h"



static int g_Main_Epoll_Fd = -1;
#define MAX_EPOLL_EVENT_COUNT 64
#define MAX_LISTEN_NUM  10
#define MAX_BUFF_LEN 512


/*
#define BIT_TEST(a, b)          ((a) & (b))
#define BIT_RESET(a, b)         ((a) ~= (b))
#define BIT_SET(a, b)           ((a) |= (b))
#define BIT_MATCH(a,b)          ((a)&(b) = (b))
#define BIT_COMPARE(a, b, c)    ((a)&(b) == (c))
*/


static void  print_socket_info(struct addrinfo *ai)
{
    char ipstr[INET6_ADDRSTRLEN];
    uint16_t port;
    void *addr  = NULL;
    char *ipver = NULL;
    struct sockaddr_in *ipv4 = NULL;
    struct sockaddr_in6 *ipv6 = NULL;

    assert(NULL != ai);
    
    if (ai->ai_family == AF_INET) 
    { // IPv4
        ipv4 = (struct sockaddr_in *)ai->ai_addr;
        addr = &(ipv4->sin_addr);
        ipver = "IPv4";
        port =ntohs(((struct sockaddr_in*)ai->ai_addr)->sin_port);
    } 
    else 
    { // IPv6
        ipv6 = (struct sockaddr_in6 *)ai->ai_addr;
        addr = &(ipv6->sin6_addr);
        ipver = "IPv6";
        port =ntohs(((struct sockaddr_in6*)ai->ai_addr)->sin6_port);
    }

    // convert the IP to a string and print it:
    (void)inet_ntop(ai->ai_family, addr, ipstr, sizeof ipstr);
    printf(" server initing... AF: %s IP: %s,PORT: %u\n", ipver, ipstr,port);

    return ;
}

static int SetSocketNoblocking(const int sockfd)
{
    int flags = 0;
    int iret  = 0;

    flags = fcntl (sockfd, F_GETFL, 0);
    if (flags == -1)
    {
        perror ("fcntl");
        return -1;
    }
 
    flags |= O_NONBLOCK;
    iret = fcntl (sockfd, F_SETFL, flags);
    if (iret == -1)
    {
        perror ("fcntl");
        return -1;
    }

    return 0;
}


static int Epoll_Control(const int sockfd, const int oper, const int events)
{
    struct epoll_event ep_event;
    int iret = 0;

    memset(&ep_event, 0, sizeof(ep_event));
    ep_event.events = events;
    ep_event.data.fd = sockfd;

    iret = epoll_ctl(g_Main_Epoll_Fd, oper, sockfd, &ep_event);
    if (0 > iret)
    {
        perror("epoll_ctl error.");
        printf("epoll_ctl error opt:%d\n", oper);
        return -1;
    }

    return 0;
}


static int CreateServerScoket(const char *host, const char *port)
{
    struct addrinfo hints;
    struct addrinfo*ailist= NULL; 
    struct addrinfo *ai= NULL;
    int iret;
    int sockfd = 0;
    int sockoptval = 1;
     
    memset(&hints, 0, sizeof(hints));
    hints.ai_family = AF_UNSPEC; // AF_INET 或 AF_INET6 
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE;     /* All interfaces */
     
    iret = getaddrinfo(host, port, &hints, &ailist);
    if (0 != iret)
    {
        fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(iret));
        return -1;
    }
     
    for(ai = ailist;ai != NULL; ai = ai->ai_next) 
    {

        sockfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
        if (0 > sockfd)
        {
            continue;
        }
    
        /* set reuse addr */
        iret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &sockoptval, sizeof(sockoptval));
        if (0 > iret)
        {
            perror("setsockopt error\n");
            freeaddrinfo(ailist);
            return -1;
        }
    
        iret = bind(sockfd, ai->ai_addr, ai->ai_addrlen);/*sizeof(struct sockaddr)*/
        if (0 == iret)
        {
            print_socket_info(ai);
            break;
        }
        else
        {
            close(sockfd);
        }
    }

    if (NULL == ai)
    {
        fprintf(stderr, "get server socket error.\n");
        return -1;
    }
    
    freeaddrinfo(ailist); 

    /* set no-block mode */
    iret = SetSocketNoblocking(sockfd);
    if (0 != iret)
    {
        return -1;
    }

    /* listen socket */

    iret = listen(sockfd, MAX_LISTEN_NUM);
    if (0  > iret)
    {
        perror("listen socket error.\n");
        return -1;
    }

    printf("server socket init success. listenning...\n");

    return sockfd;
}


static void HandleAccept(const int listen_scok, int Event)
{
    
    struct sockaddr_in addr;
    socklen_t  len = sizeof(addr);
    int client_sock  = -1;

    if (BIT_TEST(EPOLLIN, Event))
    {
        client_sock = accept(listen_scok, (struct sockaddr *)&addr, &len);
        if(client_sock < 0 )
        {
        	perror("accept infd error");
        	return;
        }

        (void)Epoll_Control(client_sock, EPOLL_CTL_ADD, EPOLLIN);
    }

    return ;
}

static void ProcRecvMsg(int sockfd)
{
    ssize_t recvlen = 0;
    char recvBuf[MAX_BUFF_LEN];

    
    //read ev  ready;
    recvlen = recv(sockfd, recvBuf, sizeof(recvBuf), 0);
    if(recvlen < 0)
    {
    	perror("recv error.");
    	return ;
    }
    else if(recvlen == 0)
    {
    	printf("client quit\n");	
        (void)Epoll_Control(sockfd, EPOLL_CTL_DEL, 0);
    	close(sockfd);
    }
    else
    {
        (void)Epoll_Control(sockfd, EPOLL_CTL_MOD, EPOLLOUT);
    	printf("recv msg fprom client:msg#:%s\n",recvBuf);
    }

    return ;
}

static void SendMsg2Client(const int sockfd)
{
    ssize_t sendlen = 0;
    char sendBuf[MAX_BUFF_LEN];
    int event = 0;
    
    sprintf(sendBuf, "HTTP/1.0 200 OK\r\n\r\n<html><h1>Hello Epoll! [client_fd:%d]</h1></html>", sockfd);  
    int sendsize = send(sockfd, sendBuf, strlen(sendBuf)+1, 0);  
    if(sendsize <= 0)  
    {  
        perror("send error.");
        (void)Epoll_Control(sockfd, EPOLL_CTL_DEL, 0);
        close(sockfd);
    }  
    else  
    {  
        printf("Server reply msg ok! data: %s\n", sendBuf);  
        event = EPOLLIN | EPOLLERR | EPOLLHUP;
        (void)Epoll_Control(sockfd, EPOLL_CTL_MOD, event);
    }  

    return ;
}

int main(int argc, char *argv[])
{
    int listenfd =0;
    int epfd = 0;
    int iret = 0;
    int event = 0;
    struct epoll_event ep_events[MAX_EPOLL_EVENT_COUNT];
    int ev_num = 0;
    int in_sockfd = 0;
    int loop = 0;

    if (3 > argc)
    {
        printf("Usage: %s [ip_addr] [port]\n",argv[0]);
        return -1;
    }

    listenfd = CreateServerScoket(argv[1], argv[2]);
    if (0 > listenfd)
    {
        return -1;
    }

    
    epfd = epoll_create(1);
    if (-1 == epfd)
    {
        perror("create epoll error.\n");
        return -1;
    }
    else
    {
        g_Main_Epoll_Fd = epfd;
    }

    event = EPOLLIN;
    iret = Epoll_Control(listenfd, EPOLL_CTL_ADD, event);
    if (0 != iret)
    {
        return -1;
    }

    /* proc epoll event regined */
    printf("server init success, epoll wait now.\n");
    
    for(;;)
    {
        memset(ep_events, 0, sizeof(ep_events));
        ev_num = epoll_wait(g_Main_Epoll_Fd, ep_events, MAX_EPOLL_EVENT_COUNT, -1);

        switch (ev_num)
        {
            case 0:
            {
                printf("epoll_wait timeout.\n");
                break;
            }
            case -1:
            {
                perror("epoll_wait error.\n");
                break;
            }

            default:
            {
                for (loop = 0; loop < ev_num; loop++)
                {
                    in_sockfd = ep_events[loop].data.fd;
                    event = ep_events[loop].events;
                    if (in_sockfd == listenfd)
                    {
                        HandleAccept(in_sockfd, event);
                    }
                    else
                    {
                        if (BIT_TEST(EPOLLIN, event))
                        {
                            /* have data to read */
                            ProcRecvMsg(in_sockfd);
                        }
                        else if (BIT_TEST(EPOLLOUT, event))
                        {
                            SendMsg2Client(in_sockfd);
                        }
                    }
                }
                
                break;
            }
        }
        
    }
    
    return 0;
}

结果如下,客户端任意都可以:注意,运行时需要手动输入服务器主机名及端口号。

epoll 学习服务器的简单实现-原始epoll结构

 

 

 

相关标签: epoll linux