欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【epoll】netty epoll的边沿触发 jdk的水平触发

程序员文章站 2022-06-14 11:00:48
...

Linux epoll event model.

One, introduction

    Epoll is the Linux (kernel version 2.6 and above support) enhanced version of multiplex IO interface select/poll, It can significantly improve the program in a large number of concurrent connections in the system CPU only a small amount of active utilization under the condition of, Because it can reuse the file descriptor set to deliver results without forcing developers to wait before each event must be re ready to be listening on the file descriptor set, Another reason is to acquire the event, It does not need to traverse the whole intercepted descriptor set, As long as the traversal the kernel IO event asynchronous wakeup and join the Ready queue descriptor set on the line.

    The difference between select model and epoll model in Linux:

    Suppose you are in the University, there are a lot of room dormitory live, your friends will come to you. Select Lodge tube aunt(宿管阿姨) will bring your friends get room to find, until find you. While the epoll version Lodge tube aunt will put down your room number, your friends, just tell your friends which room are you in, don't have to take your friends over the building looking for someone. If 10000 people came, to find their own live in this building classmates, select version and epoll version Lodge tube aunt, who is more efficient, self-evident(自见分晓). Similarly, in the high concurrency servers, polling I/O is one of the most time consuming, performance of select and epoll who are higher, also very clear.

Two, detailed

 

The epoll interface is very simple, a total of three functions:

1. int epoll_create(int size);

    The handle to create a epoll, size to tell the number of kernel this monitor has much. This parameter is different from the select () in the first parameter, giving the maximum fd+1 value monitoring. Note that, when creating a epoll handle, it will occupy a FD value in the Linux view, if /proc/ process id/fd/, is able to see the FD, so in the end the use of the epoll, you must call close () off, otherwise may cause FD to be exhausted(耗光).

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);

   The epoll event registration function, which is registered to monitor event types.

   The first parameter is the epoll_create () returns the value,

   The second parameter that action, with three macro representation:

   EPOLL_CTL_ADD: Register a new FD to EPFD,

   EPOLL_CTL_MOD: Changes have been registered with the FD monitor events,

   EPOLL_CTL_DEL: Remove a FD from EPFD,

   The third parameter is the need to monitor FD,

   The fourth parameter is the need to monitor tells the kernel what, following struct epoll_event structure:

struct epoll_event {
  __uint32_t events; /* Epoll events */
  epoll_data_t data; /* User data variable */
};

typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

   Events can be set the following macros:

   EPOLLIN : Said the corresponding file descriptor can be read off the end (including normal SOCKET),

   EPOLLOUT: Said the corresponding file descriptor can write,

   EPOLLPRI: Said the corresponding file descriptor data readable emergency (here should be expressed with external data.),

   EPOLLERR: Representation error occurred corresponding file descriptor,

   EPOLLHUP: Said the corresponding file descriptor was hung up,

   EPOLLET: Set EPOLL to the edge triggered (Edge Triggered) mode (default level trigger), which is relative to the level trigger (Level Triggered),.

   EPOLLONESHOT: Only listen to an event, when the * after the event, if it is needed to monitor the socket, need to put the socket into the EPOLL queue.

3. int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

   Produce waiting for events. Parameter events is used to get a set of events from the kernel, the kernel of maxevents that this events is much, the maxevents value cannot be greater than epoll_create (create) when size, the parameter timeout is a timeout (MS, 0 will return immediately, -1 will not sure, also said the law is permanently block). This function returns the number of events to be handled, such as the return of 0 represents has timed out.

4. There are two kinds of model EPOLL event:

Edge Triggered (ET) Edge trigger (边缘触发)only comes, to trigger, regardless of whether the data buffer cache.

Level Triggered (LT) Level trigger (水平触发)as long as there are data will trigger.

If this is an example of:

1 we have used to read a file handle data from the pipeline (RFD) added to the epoll descriptor

2 this time from the other end of the pipeline is written to the 2KB data

3 call to epoll_wait (2), and it will return RFD, indicating that it is ready to read operation

4 then we read the 1KB data

5 call to epoll_wait(2)......

Edge Triggered mode:

If we are in the first step to add the RFD to the epoll descriptor using the EPOLLET logo, then in the fifth step (2) after the call to epoll_wait will likely hang, because the input buffer with the remaining data in the file, and the data sending end and still waiting for a feedback information to already issued data. Only in the monitoring of the file handle happens when an event ET mode will report the event. Therefore in the fifth step, the caller may give up waiting still exist in the input data file remaining within the buffer zone. In the example above, there will be an event generated in RFD handle, because in the second step performs a write operation, then, the event will be destroyed in third steps. Because the fourth step read no read empty files(读完文件) in the input buffer data, so we in the fifth step to call epoll_wait (2) completed, whether to hang is uncertain. When epoll works in ET mode, you must use non blocking sockets, as to avoid blocking a file handle read / write operations to block multiple file descriptor task starve to death(当epoll在et模式下工作时,必须使用非阻塞套接字,以避免阻塞文件句柄读/写操作以阻止多个文件描述符任务饿死。). The best epoll interface in the following way of ET mode, in the back will be introduced to avoid possible defects.

Based on the

I non blocking the file handle

II only when read (2) or write (2) returns EAGAIN(eagain) only need to hang, waiting for. But this is not to say that every read () is the cycle time, only that the event processing is complete until I read a EAGAIN, when read () returns the read data is less than the requested data length, we can determine the no data has been the buffer, also can think that reading the event has finished processing. Level Triggered mode instead, with LT call the epoll interface, it is equivalent to a faster poll (2), and no matter whether the data to be used behind, so they have the same function. Because even if you use the ET mode of epoll, when receiving a plurality of chunk data will still generate multiple events. The caller can set the EPOLLONESHOT flag, the epoll_wait (2) received epoll events associated with the event from the epoll descriptor file handle is prohibited(禁止). So when EPOLLONESHOT is set, the use of EPOLL_CTL_MOD tagged epoll_ctl (2) processing the file handle will become the caller must do.

Then a detailed explanation of ET, LT:

LT(level triggered)Is the default mode, and supports both block and no-blocksocket. in this way, the kernel tells you a file descriptor is ready, then you can IO the ready fd. If you don't make any operation, the kernel will continue to inform you, so, this kind of mode programming out smaller to the error probability. The traditional select/poll is representative of this model..

ET(edge-triggered)Is the high speed mode, only support no-block socket. In this mode, the descriptor never ready to go, the kernel through the epoll tell you. Then it will assume that you know the file descriptor is ready, and not for that file descriptor to send more ready notice, until you have done something that the file descriptor will no longer be ready state (for example, are you sending, receiving or receiving a request, or sending and receiving data is less than a certain amount of time leads to a EWOULDBLOCK error). But please note, if has not the FD IO operation (causing it to become again not ready), tells the kernel does not send more (only once), but in the TCP protocol, accelerate the utility of ET mode still need more benchmark confirmation (do not understand this sentence).

In many tests we will see if not a lot of idle -connection or deadconnection, not efficiency and epoll is much higher than select/poll, but when we encounter a lot of idleconnection (for example WAN environment exist in a large number of slow connection), the efficiency will find that epoll is significantly higher than that of select/poll. (not tested)

In addition, when the ET model using epoll to work, when generating a EPOLLIN incident, when reading data needs to be considered when recv () returns the size of the if is equal to the requested size, it is likely that is the buffer zone and data not to read, but also means that the incident is not treated finished, so I need to read again:

while(rs) {
  buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0);
  if(buflen <0) {
    // Because it is a non blocking mode, so when errno is EAGAIN, representing the current buffer has no data readable
    // As the case has been handled at the right here.
    if(errno == EAGAIN) {
      break;
    } else {
      return;
    }
  } else if(buflen == 0) {
  // Here said the end of the socket has been shut down.
  }
  if(buflen == sizeof(buf)) {
    rs = 1; // Need to read again
  } else {
    rs = 0;
  }
}

Also, If the sending end flow more than receiving end flow (epoll is the program read faster than forwarding socket), because it is non blocking socket, then send () function although return, but the actual buffer data is not transmitted to a receiving end, so continue to read and send, When the buffer is full will produce EAGAIN error (man send), at the same time, ignore the request to send data. So, need to package socket_send () function to deal with this situation, the function will try to finish the return data, Returns -1 to indicate an error. In the socket_send (internal), when the write buffer is full (send () returns -1, and errno EAGAIN), then wait and try again. This method is not very perfect, long time blocking in socket_send in the theory (internal), but there is no better way.

ssize_t socket_send(int sockfd, const char* buffer, size_t buflen) {
  ssize_t tmp;
  size_t total = buflen;
  const char *p = buffer;
  while(1) {
    tmp = send(sockfd, p, total, 0);
    if(tmp <0) {
      // When the send signal is received, can continue to write, but it returns -1.
      if(errno == EINTR)
        return -1;
      // When socket is non blocking, if this error is returned, said write buffer queue is full,
      // Do delay here and try again.
      if(errno == EAGAIN) {
        usleep(1000);
        continue;
      }
      return -1;
    }
    if((size_t)tmp == total)
      return buflen;
    total -= tmp;
    p += tmp;
  }
  
  return tmp;
}

Three,demo

The 1 test environment

centos release 5.5, The version of GCC 4.1.2 20080704 (Red Hat 4.1.2-48)

2 compiler command

g++ myepoll.cpp lxx_net.cc -g -o myepoll

3 source code

 

/*
 * myepoll.cpp
 *
 * Created on: 2013-06-03
 * Author: liuxiaoxian
 * To improve the MS concurrency research: sent to the client's data sent over
 */

#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <iostream>

#include "lxx_net.h"

using namespace std;
#define MAX_EPOLL_SIZE 500
#define MAX_CLIENT_SIZE 500
#define MAX_IP_LEN      16
#define MAX_CLIENT_BUFF_LEN 1024
#define QUEUE_LEN 500
#define BUFF_LEN 1024
int fd_epoll = -1;
int fd_listen = -1;

// The customer end connection
typedef struct {
  int fd;                           // The connection handle
  char host[MAX_IP_LEN];           // IP address
  int port;                         // Port
  int len;                          // Data buffer size
  char buff[MAX_CLIENT_BUFF_LEN];   // Buffer data
  bool  status;                     // State
} client_t;

client_t *ptr_cli = NULL;

// Join epoll
int epoll_add(int fd_epoll, int fd, struct epoll_event *ev) {
  if (fd_epoll <0 || fd <0 || ev == NULL) {
    return -1;
  }

  if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, fd, ev) <0) {
    fprintf(stderr, "epoll_add failed(epoll_ctl)[fd_epoll:%d,fd:%d][%s]\n",
          fd_epoll, fd, strerror(errno));
    return -1;
  }

  fprintf(stdout, "epoll_add success[fd_epoll:%d,fd:%d]\n", fd_epoll, fd);
  return 0;
}

int epoll_del(int fd_epoll, int fd) {
  if (fd_epoll <0 || fd <0) {
    return -1;
  }

  struct epoll_event ev_del;
  if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, fd, &ev_del) <0) {
    fprintf(stderr, "epoll_del failed(epoll_ctl)[fd_epoll:%d,fd:%d][%s]\n",
          fd_epoll, fd, strerror(errno));
    return -1;
  }
  close(fd);
  fprintf(stdout, "epoll_del success[epoll_fd:%d,fd:%d]\n", fd_epoll, fd);
  return 0;
}

// Receive data
void do_read_data(int idx) {
  if (idx >= MAX_CLIENT_SIZE) {
    return;
  }
  int n;
  size_t pos = ptr_cli[idx].len;

  if ((n = recv(ptr_cli[idx].fd, ptr_cli[idx].buff+pos, MAX_CLIENT_BUFF_LEN-pos, 0))) { // Buffer data have been received
    fprintf(stdout, "[IP:%s,port:%d], data:%s\n", ptr_cli[idx].host, ptr_cli[idx].port, ptr_cli[idx].buff);
    send(ptr_cli[idx].fd, ptr_cli[idx].buff, pos+1, 0);
  } else if (n > 0) { // Buffer zone and data readability
    ptr_cli[idx].len += n;
  } else if (errno != EAGAIN) {  // To end the connection is closed
    fprintf(stdout, "The Client closed(read)[IP:%s,port:%d]\n", ptr_cli[idx].host, ptr_cli[idx].port);
    epoll_del(fd_epoll, ptr_cli[idx].fd);
    ptr_cli[idx].status = false;
  }
}

// Accept new connections
static void do_accept_client() {
  struct epoll_event ev;
  struct sockaddr_in cliaddr;
  socklen_t cliaddr_len = sizeof(cliaddr);

  int conn_fd = lxx_net_accept(fd_listen, (struct sockaddr *)&cliaddr, &cliaddr_len);
  if (conn_fd >= 0) {
    if (lxx_net_set_socket(conn_fd, false) != 0) {
      close(conn_fd);
      fprintf(stderr, "do_accept_client failed(setnonblock)[%s:%d]\n",
      inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
      return;
    }

    int i = 0;
    bool flag = true;

    // Looking for the right.
    for (i = 0; i <MAX_CLIENT_SIZE; i++) {
      if (!ptr_cli[i].status) {
        ptr_cli[i].port = cliaddr.sin_port;
        snprintf(ptr_cli[i].host, sizeof(ptr_cli[i].host), inet_ntoa(cliaddr.sin_addr));
        ptr_cli[i].len = 0;
        ptr_cli[i].fd = conn_fd;
        ptr_cli[i].status = true;
        flag = false;
        break;
      }
    }

    if (flag) {// No connection available
      close(conn_fd);
      fprintf(stderr, "do_accept_client failed(not found unuse client)[%s:%d]\n",
                    inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
    } else {
      ev.events = EPOLLIN;
      ev.data.u32 = i | 0x10000000;
      if (epoll_add(fd_epoll, conn_fd, &ev) <0) {
        ptr_cli[i].status = false;
        close(ptr_cli[i].fd);
        fprintf(stderr, "do_accept_client failed(epoll_add)[%s:%d]",
        inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
        return;
      }

      fprintf(stdout, "do_accept_client success[%s:%d]",
            inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
    }
  }
}


int main(int argc, char **argv) {
  unsigned short port = 12345;
  if(argc == 2){
      port = atoi(argv[1]);
  }
  if ((fd_listen = lxx_net_listen(port, QUEUE_LEN)) <0) {
    fprintf(stderr, "listen port failed[%d]", port);
    return -1;
  }

  fd_epoll = epoll_create(MAX_EPOLL_SIZE);
  if (fd_epoll <0) {
    fprintf(stderr, "create epoll failed.%d\n", fd_epoll);
    close(fd_listen);
    return -1;
  }

  // The monitor connected to join the event collection
  struct epoll_event ev;
  ev.events = EPOLLIN;
  ev.data.fd = fd_listen;

  if (epoll_add(fd_epoll, fd_listen, &ev) <0) {
    close(fd_epoll);
    close(fd_listen);
    fd_epoll = -1;
    fd_listen = -1;
    return -1;
  }

  ptr_cli = new client_t[MAX_CLIENT_SIZE];
  struct epoll_event events[MAX_EPOLL_SIZE];
  for (;;) {
    int nfds = epoll_wait(fd_epoll, events, MAX_EPOLL_SIZE, 10);

    if (nfds <0) {
      int err = errno;
      if (err != EINTR) {
        fprintf(stderr, "epoll_wait failed[%s]", strerror(err));
      }
      continue;
    }

    for (int i = 0; i <nfds; i++) {
      if (events[i].data.u32 & 0x10000000) {
        // Receive data
        do_read_data(events[i].data.u32 & 0x0FFFFFFF);
      } else if(events[i].data.fd == fd_listen) {
        // Accept new connections
        do_accept_client();
      }
    }
  }
  return 0;
}



 

4 operating results

 

【epoll】netty epoll的边沿触发 jdk的水平触发

 

出处:

https://www.programering.com/a/MzN4kTMwATg.html