epoll 学习服务器的简单实现-原始epoll结构
1.begins~
学习linux编程很久,只知道网络编程是socket,bind, listen。。。,然而这些都是网络通信软件最基本的接口。在某网络公司待了y,也了解到公司的基础就是网络转发 ,然而网络转发实现并非我们平时所见的简单的send,recv。公司设备的转发都是建立在稳定并且高效的内部业务的基础上的,例如一个可靠性的服务:BFD(链路双向快速检测),进程内业务就是建立在内核与用户态,后台处理与前台配置交互的过程上实现。在做项目的日子里,渐渐的对公司BFD模块有一定了解,特别是涉及到框架的地方,仔细看发现,一些技巧其实就是我们平日所用的技能外加一些优化。对于单进程用户态来说,处理前台进程交互与内核上送消息,如果没有一个高效的机制,是很难处理上以千计甚至万计的消息的。在参看公司代码的初始化阶段,发现一共用了至少3 个epoll回调机制,一个是处理前台发送的配置及查询,一个是处理与其他应用模块的交互,另一个是想其他应用通报状态变化的机制。公司的epoll_event 机制不知道是不是修改了内核,结构大致是这样:
typedef struct epoll_event {
uint32_t events; // Epoll events
epoll_data_t data; // User data variable
ep_callback callback; /* 这跟linux内核不一样 ? */
}EPOLL_PACK;
/*
在添加epoll事件的时候就多了一个回调函数注册,这个十分方便了,回调函数可以用一个全局变量搞定,一个框架搭好,随后添加就大大的简单多了
*/
实际内核提供给我们使用的用户态结构是这样:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; // Epoll events
epoll_data_t data; // User data variable
};
不过我们也可以将epoll_data_t这个联合体封装在一个结构,只需要用指针*ptr指向封装的带fd及回调函数就可以。打算自己学习一些必要的技能,暂且就记录一下吧
2.epoll API
extern int epoll_create(int size);
extern int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
extern int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
① int epoll_create(int size);
创建一个epoll的句柄。自从linux2.6.8之后,size参数是被忽略的。需要注意的是,当创建好epoll句柄后,
它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll
后,必须调用close()关闭,否则可能导致fd被耗尽。
②int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先
注册要监听的事件类型。
参数:
第一个参数是epoll_create()的返回值。
第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd。
第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
enum EPOLL_EVENTS{
EPOLLIN = 0x001,
#define EPOLLIN EPOLLIN
EPOLLPRI = 0x002,
#define EPOLLPRI EPOLLPRI
EPOLLOUT = 0x004,
#define EPOLLOUT EPOLLOUT
EPOLLRDNORM = 0x040,
#define EPOLLRDNORM EPOLLRDNORM
EPOLLRDBAND = 0x080,
#define EPOLLRDBAND EPOLLRDBAND
EPOLLWRNORM = 0x100,
#define EPOLLWRNORM EPOLLWRNORM
EPOLLWRBAND = 0x200,
#define EPOLLWRBAND EPOLLWRBAND
EPOLLMSG = 0x400,
#define EPOLLMSG EPOLLMSG
EPOLLERR = 0x008,
#define EPOLLERR EPOLLERR
EPOLLHUP = 0x010,
#define EPOLLHUP EPOLLHUP
EPOLLRDHUP = 0x2000,
#define EPOLLRDHUP EPOLLRDHUP
EPOLLWAKEUP = 1u << 29,
#define EPOLLWAKEUP EPOLLWAKEUP
EPOLLONESHOT = 1u << 30,
#define EPOLLONESHOT EPOLLONESHOT
EPOLLET = 1u << 31
#define EPOLLET EPOLLET
};
(3)epoll_wait();
参数events是分配好的epoll_event结构体数组;
epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复制到)
参数timeout 是超时时间(毫秒,0表示立即返回,-1表示永久阻塞);
如果函数调用成功,返回对应IO上已经准备好的文件描述符数目;如果返回0,表示已超时;如果返回值小于0,表示函数失败。
3.epoll mode
epoll的使用也很方便,基本都会用到一下模型,取自linux帮助手册:
/*
Example for suggested usage
While the usage of epoll when employed as a level-triggered interface does have the same semantics as poll(2), the edge-triggered usage requires more
clarification to avoid stalls in the application event loop. In this example, listener is a nonblocking socket on which listen(2) has been called. The
function do_use_fd() uses the new ready file descriptor until EAGAIN is returned by either read(2) or write(2). An event-driven state machine applica‐
tion should, after having received EAGAIN, record its current state so that at the next call to do_use_fd() it will continue to read(2) or write(2) from
where it stopped before.*/
#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
/* Code to set up listening socket, 'listen_sock',
(socket(), bind(), listen()) omitted */
static Epoll_Case()
{
epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock,
(struct sockaddr *) &local, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(EXIT_FAILURE);
}
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(EXIT_FAILURE);
}
} else {
do_use_fd(events[n].data.fd);
}
}
}
}
/*
When used as an edge-triggered interface, for performance reasons, it is possible to add the file descriptor inside the epoll interface (EPOLL_CTL_ADD)
once by specifying (EPOLLIN|EPOLLOUT). This allows you to avoid continuously switching between EPOLLIN and EPOLLOUT calling epoll_ctl(2) with
EPOLL_CTL_MOD.
*/
下面给出一个epoll 服务器的简单例子,参考了网上不少方式,自己写了一个,算不上原创,把BIT_TEST部分注释打开,可以忽略basetype.h,可以编译通过。创建socket的地方可以自己优化省略,我实现的时候觉得这种方式比较容易看清楚socket的获取方式。
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <poll.h>
#include <assert.h>
#include <unistd.h>
#include <fcntl.h>
#include "basetype.h"
static int g_Main_Epoll_Fd = -1;
#define MAX_EPOLL_EVENT_COUNT 64
#define MAX_LISTEN_NUM 10
#define MAX_BUFF_LEN 512
/*
#define BIT_TEST(a, b) ((a) & (b))
#define BIT_RESET(a, b) ((a) ~= (b))
#define BIT_SET(a, b) ((a) |= (b))
#define BIT_MATCH(a,b) ((a)&(b) = (b))
#define BIT_COMPARE(a, b, c) ((a)&(b) == (c))
*/
static void print_socket_info(struct addrinfo *ai)
{
char ipstr[INET6_ADDRSTRLEN];
uint16_t port;
void *addr = NULL;
char *ipver = NULL;
struct sockaddr_in *ipv4 = NULL;
struct sockaddr_in6 *ipv6 = NULL;
assert(NULL != ai);
if (ai->ai_family == AF_INET)
{ // IPv4
ipv4 = (struct sockaddr_in *)ai->ai_addr;
addr = &(ipv4->sin_addr);
ipver = "IPv4";
port =ntohs(((struct sockaddr_in*)ai->ai_addr)->sin_port);
}
else
{ // IPv6
ipv6 = (struct sockaddr_in6 *)ai->ai_addr;
addr = &(ipv6->sin6_addr);
ipver = "IPv6";
port =ntohs(((struct sockaddr_in6*)ai->ai_addr)->sin6_port);
}
// convert the IP to a string and print it:
(void)inet_ntop(ai->ai_family, addr, ipstr, sizeof ipstr);
printf(" server initing... AF: %s IP: %s,PORT: %u\n", ipver, ipstr,port);
return ;
}
static int SetSocketNoblocking(const int sockfd)
{
int flags = 0;
int iret = 0;
flags = fcntl (sockfd, F_GETFL, 0);
if (flags == -1)
{
perror ("fcntl");
return -1;
}
flags |= O_NONBLOCK;
iret = fcntl (sockfd, F_SETFL, flags);
if (iret == -1)
{
perror ("fcntl");
return -1;
}
return 0;
}
static int Epoll_Control(const int sockfd, const int oper, const int events)
{
struct epoll_event ep_event;
int iret = 0;
memset(&ep_event, 0, sizeof(ep_event));
ep_event.events = events;
ep_event.data.fd = sockfd;
iret = epoll_ctl(g_Main_Epoll_Fd, oper, sockfd, &ep_event);
if (0 > iret)
{
perror("epoll_ctl error.");
printf("epoll_ctl error opt:%d\n", oper);
return -1;
}
return 0;
}
static int CreateServerScoket(const char *host, const char *port)
{
struct addrinfo hints;
struct addrinfo*ailist= NULL;
struct addrinfo *ai= NULL;
int iret;
int sockfd = 0;
int sockoptval = 1;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC; // AF_INET 或 AF_INET6
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; /* All interfaces */
iret = getaddrinfo(host, port, &hints, &ailist);
if (0 != iret)
{
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(iret));
return -1;
}
for(ai = ailist;ai != NULL; ai = ai->ai_next)
{
sockfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
if (0 > sockfd)
{
continue;
}
/* set reuse addr */
iret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &sockoptval, sizeof(sockoptval));
if (0 > iret)
{
perror("setsockopt error\n");
freeaddrinfo(ailist);
return -1;
}
iret = bind(sockfd, ai->ai_addr, ai->ai_addrlen);/*sizeof(struct sockaddr)*/
if (0 == iret)
{
print_socket_info(ai);
break;
}
else
{
close(sockfd);
}
}
if (NULL == ai)
{
fprintf(stderr, "get server socket error.\n");
return -1;
}
freeaddrinfo(ailist);
/* set no-block mode */
iret = SetSocketNoblocking(sockfd);
if (0 != iret)
{
return -1;
}
/* listen socket */
iret = listen(sockfd, MAX_LISTEN_NUM);
if (0 > iret)
{
perror("listen socket error.\n");
return -1;
}
printf("server socket init success. listenning...\n");
return sockfd;
}
static void HandleAccept(const int listen_scok, int Event)
{
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
int client_sock = -1;
if (BIT_TEST(EPOLLIN, Event))
{
client_sock = accept(listen_scok, (struct sockaddr *)&addr, &len);
if(client_sock < 0 )
{
perror("accept infd error");
return;
}
(void)Epoll_Control(client_sock, EPOLL_CTL_ADD, EPOLLIN);
}
return ;
}
static void ProcRecvMsg(int sockfd)
{
ssize_t recvlen = 0;
char recvBuf[MAX_BUFF_LEN];
//read ev ready;
recvlen = recv(sockfd, recvBuf, sizeof(recvBuf), 0);
if(recvlen < 0)
{
perror("recv error.");
return ;
}
else if(recvlen == 0)
{
printf("client quit\n");
(void)Epoll_Control(sockfd, EPOLL_CTL_DEL, 0);
close(sockfd);
}
else
{
(void)Epoll_Control(sockfd, EPOLL_CTL_MOD, EPOLLOUT);
printf("recv msg fprom client:msg#:%s\n",recvBuf);
}
return ;
}
static void SendMsg2Client(const int sockfd)
{
ssize_t sendlen = 0;
char sendBuf[MAX_BUFF_LEN];
int event = 0;
sprintf(sendBuf, "HTTP/1.0 200 OK\r\n\r\n<html><h1>Hello Epoll! [client_fd:%d]</h1></html>", sockfd);
int sendsize = send(sockfd, sendBuf, strlen(sendBuf)+1, 0);
if(sendsize <= 0)
{
perror("send error.");
(void)Epoll_Control(sockfd, EPOLL_CTL_DEL, 0);
close(sockfd);
}
else
{
printf("Server reply msg ok! data: %s\n", sendBuf);
event = EPOLLIN | EPOLLERR | EPOLLHUP;
(void)Epoll_Control(sockfd, EPOLL_CTL_MOD, event);
}
return ;
}
int main(int argc, char *argv[])
{
int listenfd =0;
int epfd = 0;
int iret = 0;
int event = 0;
struct epoll_event ep_events[MAX_EPOLL_EVENT_COUNT];
int ev_num = 0;
int in_sockfd = 0;
int loop = 0;
if (3 > argc)
{
printf("Usage: %s [ip_addr] [port]\n",argv[0]);
return -1;
}
listenfd = CreateServerScoket(argv[1], argv[2]);
if (0 > listenfd)
{
return -1;
}
epfd = epoll_create(1);
if (-1 == epfd)
{
perror("create epoll error.\n");
return -1;
}
else
{
g_Main_Epoll_Fd = epfd;
}
event = EPOLLIN;
iret = Epoll_Control(listenfd, EPOLL_CTL_ADD, event);
if (0 != iret)
{
return -1;
}
/* proc epoll event regined */
printf("server init success, epoll wait now.\n");
for(;;)
{
memset(ep_events, 0, sizeof(ep_events));
ev_num = epoll_wait(g_Main_Epoll_Fd, ep_events, MAX_EPOLL_EVENT_COUNT, -1);
switch (ev_num)
{
case 0:
{
printf("epoll_wait timeout.\n");
break;
}
case -1:
{
perror("epoll_wait error.\n");
break;
}
default:
{
for (loop = 0; loop < ev_num; loop++)
{
in_sockfd = ep_events[loop].data.fd;
event = ep_events[loop].events;
if (in_sockfd == listenfd)
{
HandleAccept(in_sockfd, event);
}
else
{
if (BIT_TEST(EPOLLIN, event))
{
/* have data to read */
ProcRecvMsg(in_sockfd);
}
else if (BIT_TEST(EPOLLOUT, event))
{
SendMsg2Client(in_sockfd);
}
}
}
break;
}
}
}
return 0;
}
结果如下,客户端任意都可以:注意,运行时需要手动输入服务器主机名及端口号。
上一篇: MySQL基础篇(01):经典实用查询案例,总结整理
下一篇: 公司有个女汉子