libevent中的事件机制
程序员文章站
2022-03-10 19:10:14
libevent是事件驱动的网络库,事件驱动是他的核心,所以理解事件驱动对于理解整个网络库有很重要的意义。 本着从简入繁,今天分析下单线程最简单的事件触发。通过sample下的event-test来理解libevent的事件驱动。 代码版本为1.4.14。 libevent事件机制:当事件发生, l ......
libevent是事件驱动的网络库,事件驱动是他的核心,所以理解事件驱动对于理解整个网络库有很重要的意义。
本着从简入繁,今天分析下单线程最简单的事件触发。通过sample下的event-test来理解libevent的事件驱动。
代码版本为1.4.14。
libevent事件机制:当事件发生, libevent就会根据用户设定的方式自动执行指定的回调函数,来处理事件。
这是一种reactor方式的事件通知方式,由事件驱动。reactor的优点:响应快,编程简单等等。。。
首先看下几个重要的结构。等全部分析完libevent,再把全部注释过的代码上传到github上。如果有错误及时告诉我,谢谢。
1.event_base我的理解是当前线程中所有事件的一个管理者。位于event-internal.h中。
1 //事件基础管理 2 struct event_base { 3 //I/O复用类型,select、epoll...linux默认是epoll 4 const struct eventop *evsel; 5 //具体的I/O复用,是epollop类型,通过eventop中的init函数返回,包含了具体的I/O复用各种信息 6 void *evbase; 7 //总共的事件个数 8 int event_count; /* counts number of total events */ 9 //总共的活动事件个数 10 int event_count_active; /* counts number of active events */ 11 12 //退出 13 int event_gotterm; /* Set to terminate loop */ 14 //立即退出 15 int event_break; /* Set to terminate loop immediately */ 16 17 /* active event management */ 18 //活动事件队列,二维链表。第一维是根据优先级,第二维是每个优先级中对应加入的事件 19 struct event_list **activequeues; 20 //优先级队列数量。数组第一维必须告诉大小。因为如果是数组,参入函数,第一维肯定退化为指针,无法知道长度 21 int nactivequeues; 22 23 //信号信息 24 /* signal handling info */ 25 struct evsignal_info sig; 26 27 //所有事件队列 28 struct event_list eventqueue; 29 30 //event_base创建时间 31 struct timeval event_tv; 32 33 //event_base时间小根堆 34 struct min_heap timeheap; 35 36 //event_base缓存时间 37 struct timeval tv_cache; 38 };2.eventop
当前选用的I/O复用模型的封装。位于event-internal.h中。
1 //I/O复用封装 2 struct eventop { 3 const char *name; 4 void *(*init)(struct event_base *); //初始化 5 int (*add)(void *, struct event *); //注册 6 int (*del)(void *, struct event *); //删除 7 int (*dispatch)(struct event_base *, void *, struct timeval *); //事件分发 8 void (*dealloc)(struct event_base *, void *);//释放资源 9 /* set if we need to reinitialize the event base */ 10 int need_reinit; 11 };3.event
事件信息的封装
1 struct event { 2 //事件在队列中的节点(下次分析此队列的实现) 3 TAILQ_ENTRY (event) ev_next; 4 TAILQ_ENTRY (event) ev_active_next; 5 TAILQ_ENTRY (event) ev_signal_next; 6 //事件在最小时间堆中位置 7 unsigned int min_heap_idx; /* for managing timeouts */ 8 9 //事件的当前管理类 10 struct event_base *ev_base; 11 //事件对应的文件描述符,一切皆文件 12 int ev_fd; 13 //事件类型 14 short ev_events; 15 //发送到活动队列后要执行的次数 16 short ev_ncalls; 17 //ev_pncalls指向ev_ncalls,允许在回调中将自己的事件执行次数置为0,然后退出 18 short *ev_pncalls; /* Allows deletes in callback */ 19 20 //事件触发的时间 21 struct timeval ev_timeout; 22 23 //事件优先级 24 int ev_pri; /* smaller numbers are higher priority */ 25 26 //事件到来回调 27 void (*ev_callback)(int, short, void *arg); 28 //事件到来回调的参数 29 void *ev_arg; 30 31 //事件在活动队列中的事件类型,发送给回调函数,让回调函数知道发生事件的原因 32 int ev_res; /* result passed to event callback */ 33 34 //标识该事件在哪个队列中,插入的是哪个队列 35 int ev_flags; 36 };4.接着看几个比较重要的宏定义
1 //队列标记 2 //定时器队列,与时间有关的事件加入此队列 3 #define EVLIST_TIMEOUT 0x01 4 //总队列,代表已经插入过 5 #define EVLIST_INSERTED 0x02 6 //信号队列 7 #define EVLIST_SIGNAL 0x04 8 //活动队列 9 #define EVLIST_ACTIVE 0x08 10 //内部队列 11 #define EVLIST_INTERNAL 0x10 12 //初始化队列 13 #define EVLIST_INIT 0x80 14 15 /* EVLIST_X_ Private space: 0x1000-0xf000 */ 16 #define EVLIST_ALL (0xf000 | 0x9f) 17 //事件类型,发生了什么事件 18 19 //定时超时,表明事件超时,如果在活动队列中,需要执行 20 #define EV_TIMEOUT 0x01 21 //I/O事件 22 #define EV_READ 0x02 23 #define EV_WRITE 0x04 24 //信号 25 #define EV_SIGNAL 0x08 26 //持续事件 27 #define EV_PERSIST 0x10 /* Persistant event */事件机制流程图
通过流程图可以更加清晰的理解。
测试代码test.c
1 #include <sys/types.h> 2 #include <sys/stat.h> 3 #include <sys/queue.h> 4 #include <sys/time.h> 5 #include <fcntl.h> 6 #include <stdlib.h> 7 #include <stdio.h> 8 #include <string.h> 9 #include <unistd.h> 10 #include <errno.h> 11 int main(int argc,char **argv){ 12 char *input = argv[1]; 13 if(argc !=2){ 14 input = "hello"; 15 } 16 int fd; 17 fd = open("event.fifo",O_WRONLY); 18 if(fd == -1){ 19 perror("open error"); 20 exit(EXIT_FAILURE); 21 } 22 write(fd,input,strlen(input)); 23 close(fd); 24 printf("write success\n"); 25 return 0; 26 }1.首先运行./event-test
可以看到linux默认的I/O复用是epoll。加入队列,并且不是定时函数。最后进入循环到达epoll_dispatch分发。
2.另一个终端中运行./test此时上面一个终端可以收到如下信息:事件触发,调用event_del,从相应队列中删除,然后执行,event-test.c中再次加入了事件,进入事件循环。
具体函数注释: 1.event_base_new1 //创建event_base 2 struct event_base * 3 event_base_new(void) 4 { 5 int i; 6 struct event_base *base; 7 //申请空间 8 if ((base = calloc(1, sizeof(struct event_base))) == NULL) 9 event_err(1, "%s: calloc", __func__); 10 11 event_sigcb = NULL; 12 event_gotsig = 0; 13 //是否使用绝对时间 14 detect_monotonic(); 15 //获取event_base创建时间 16 gettime(base, &base->event_tv); 17 18 //初始化小根堆 19 min_heap_ctor(&base->timeheap); 20 //初始化队列 21 TAILQ_INIT(&base->eventqueue); 22 //信号相关 23 base->sig.ev_signal_pair[0] = -1; 24 base->sig.ev_signal_pair[1] = -1; 25 26 base->evbase = NULL; 27 //获得I/O复用,选到合适的就往下执行。linux默认是epoll 28 for (i = 0; eventops[i] && !base->evbase; i++) { 29 //获得I/O复用 30 base->evsel = eventops[i]; 31 //获得具体的I/O复用信息 32 base->evbase = base->evsel->init(base); 33 } 34 //没有I/O复用,报错退出 35 if (base->evbase == NULL) 36 event_errx(1, "%s: no event mechanism available", __func__); 37 //如果设置了EVENT_SHOW_METHOD,输出IO复用名字 38 if (evutil_getenv("EVENT_SHOW_METHOD")) 39 event_msgx("libevent using: %s\n", 40 base->evsel->name); 41 42 /* allocate a single active event queue */ 43 //初始化活动队列的优先级,默认优先级为1 44 event_base_priority_init(base, 1); 45 46 return (base); 47 }2.event_base_priority_init
1 //初始化优先队列 2 int 3 event_base_priority_init(struct event_base *base, int npriorities) 4 { 5 int i; 6 //如果base中有活动事件,返回,不处理优先级的初始化 7 if (base->event_count_active) 8 return (-1); 9 //如果优先级数量未变,没有必要执行 10 if (npriorities == base->nactivequeues) 11 return (0); 12 //释放所有优先级队列 13 if (base->nactivequeues) { 14 for (i = 0; i < base->nactivequeues; ++i) { 15 free(base->activequeues[i]); 16 } 17 free(base->activequeues); 18 } 19 20 /* Allocate our priority queues */ 21 //分配优先级队列 22 base->nactivequeues = npriorities; 23 base->activequeues = (struct event_list **) 24 calloc(base->nactivequeues, sizeof(struct event_list *)); 25 if (base->activequeues == NULL) 26 event_err(1, "%s: calloc", __func__); 27 //默认每个优先级分配一个节点,作为事件队列的队列的头结点 28 for (i = 0; i < base->nactivequeues; ++i) { 29 base->activequeues[i] = malloc(sizeof(struct event_list)); 30 if (base->activequeues[i] == NULL) 31 event_err(1, "%s: malloc", __func__); 32 //每个事件都初始化为队列的头结点 33 TAILQ_INIT(base->activequeues[i]); 34 } 35 36 return (0); 37 }3.event_set
1 //设置与注册event 2 //ev: 需要注册的事件 3 //fd: 文件描述符 4 //events: 注册事件的类型 5 //callback: 注册事件的回调函数 6 //arg: 注册事件回调函数的参数 7 //事件类型有: 8 //#define EV_TIMEOUT 0x01 9 //#define EV_READ 0x02 10 //#define EV_WRITE 0x04 11 //#define EV_SIGNAL 0x08 12 //定时事件event_set(ev, -1, 0, cb, arg) 13 void 14 event_set(struct event *ev, int fd, short events, 15 void (*callback)(int, short, void *), void *arg) 16 { 17 /* Take the current base - caller needs to set the real base later */ 18 //默认为全局ev_base进行事件的注册 19 ev->ev_base = current_base; 20 //事件回调 21 ev->ev_callback = callback; 22 //事件回调参数 23 ev->ev_arg = arg; 24 //对应文件描述符 25 ev->ev_fd = fd; 26 //事件类型 27 ev->ev_events = events; 28 //事件在活动队列中的类型 29 ev->ev_res = 0; 30 //标识事件加入了哪个队列 31 ev->ev_flags = EVLIST_INIT; 32 //加入活动队列后调试的次数 33 ev->ev_ncalls = 0; 34 //Allows deletes in callback,允许在回调中删除自己 35 ev->ev_pncalls = NULL; 36 //初始化事件在堆中的位置。刚开始为-1 37 min_heap_elem_init(ev); 38 39 /* by default, we put new events into the middle priority */ 40 //默认事件的优先级为中间 41 if(current_base) 42 ev->ev_pri = current_base->nactivequeues/2; 43 }4.event_add
1 //事件加入队列 2 int 3 event_add(struct event *ev, const struct timeval *tv) 4 { 5 //事件的基础管理,事件中有一个event_base指针,指向了他所属于的管理类 6 struct event_base *base = ev->ev_base; 7 //当前I/O复用管理,包括初始化,注册,回调等。。。 8 const struct eventop *evsel = base->evsel; 9 //具体的I/O复用 10 void *evbase = base->evbase; 11 int res = 0; 12 13 event_debug(( 14 "event_add: event: %p, %s%s%scall %p", 15 ev, 16 ev->ev_events & EV_READ ? "EV_READ " : " ", 17 ev->ev_events & EV_WRITE ? "EV_WRITE " : " ", 18 tv ? "EV_TIMEOUT " : " ", 19 ev->ev_callback)); 20 21 assert(!(ev->ev_flags & ~EVLIST_ALL)); 22 23 /* 24 * prepare for timeout insertion further below, if we get a 25 * failure on any step, we should not change any state. 26 */ 27 //事件的时间tv不为null并且现在事件还不在定时队列中,我们先在小根堆中申请一个位置,以便后面加入 28 //event_set后事件的ev_flags为EVLIST_INIT 29 if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) { 30 if (min_heap_reserve(&base->timeheap, 31 1 + min_heap_size(&base->timeheap)) == -1) 32 return (-1); /* ENOMEM == errno */ 33 } 34 //如果事件类型是EV_READ,EV_WRITE,EV_SIGNAL并且事件状态不是EVLIST_INSERTED(已加入)与EVLIST_ACTIVE(已活动) 35 if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && 36 !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { 37 //将事件加入到对应的I/O复用中 38 res = evsel->add(evbase, ev); 39 if (res != -1) 40 //加入对应的I/O复用成功后,插入EVLIST_INSERTED队列 41 event_queue_insert(base, ev, EVLIST_INSERTED); 42 } 43 44 /* 45 * we should change the timout state only if the previous event 46 * addition succeeded. 47 */ 48 //定时执行事件处理(tv不为零,表示有超时时间) 49 if (res != -1 && tv != NULL) { 50 struct timeval now; 51 52 /* 53 * we already reserved memory above for the case where we 54 * are not replacing an exisiting timeout. 55 */ 56 //定时事件已经在定时队列中了,先从中删除 57 if (ev->ev_flags & EVLIST_TIMEOUT) 58 event_queue_remove(base, ev, EVLIST_TIMEOUT); 59 60 /* Check if it is active due to a timeout. Rescheduling 61 * this timeout before the callback can be executed 62 * removes it from the active list. */ 63 //定时事件是否在活动队列中,并且是定时事件,如果是,从活动队列中删除 64 if ((ev->ev_flags & EVLIST_ACTIVE) && 65 (ev->ev_res & EV_TIMEOUT)) { 66 /* See if we are just active executing this 67 * event in a loop 68 */ 69 //调用次数置零 70 if (ev->ev_ncalls && ev->ev_pncalls) { 71 /* Abort loop */ 72 *ev->ev_pncalls = 0; 73 } 74 //从活动队列中删除 75 event_queue_remove(base, ev, EVLIST_ACTIVE); 76 } 77 78 //得到当前时间 79 gettime(base, &now); 80 //更新时间 81 //当前时间点+定时事件每隔多少秒触发时间=触发时间点。ev->ev_timeout为事件触发时间点 82 evutil_timeradd(&now, tv, &ev->ev_timeout); 83 84 event_debug(( 85 "event_add: timeout in %ld seconds, call %p", 86 tv->tv_sec, ev->ev_callback)); 87 //加入定时队列 88 event_queue_insert(base, ev, EVLIST_TIMEOUT); 89 } 90 91 return (res); 92 }5.event_base_loop
1 /* not thread safe */ 2 //默认进入全局事件管理的事件循环 3 int 4 event_loop(int flags) 5 { 6 return event_base_loop(current_base, flags); 7 } 8 //事件分发,进入事件循环,默认进入全局事件管理的事件循环 9 int 10 event_base_loop(struct event_base *base, int flags) 11 { 12 //I/O复用管理 13 const struct eventop *evsel = base->evsel; 14 //具体I/O复用 15 void *evbase = base->evbase; 16 struct timeval tv; 17 struct timeval *tv_p; 18 int res, done; 19 20 /* clear time cache */ 21 base->tv_cache.tv_sec = 0; 22 //信号处理 23 if (base->sig.ev_signal_added) 24 evsignal_base = base; 25 done = 0; 26 //事件循环 27 while (!done) { 28 /* Terminate the loop if we have been asked to */ 29 //退出 30 if (base->event_gotterm) { 31 base->event_gotterm = 0; 32 break; 33 } 34 //立即退出 35 if (base->event_break) { 36 base->event_break = 0; 37 break; 38 } 39 40 /* You cannot use this interface for multi-threaded apps */ 41 //信号处理 42 while (event_gotsig) { 43 event_gotsig = 0; 44 if (event_sigcb) { 45 res = (*event_sigcb)(); 46 if (res == -1) { 47 errno = EINTR; 48 return (-1); 49 } 50 } 51 } 52 53 //检测时间对不对,不对的话要校准 54 timeout_correct(base, &tv); 55 //tv为当前时间 56 tv_p = &tv; 57 //如果当前事件活动队列为0,并且事件是阻塞的,立马到时间堆中去查找定时时间 58 if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) { 59 timeout_next(base, &tv_p); 60 } else { 61 /* 62 * if we have active events, we just poll new events 63 * without waiting. 64 */ 65 //活动队列不为空,或者此事件是非阻塞事件,将超时时间置为零,意味着没有超时时间 66 evutil_timerclear(&tv); 67 } 68 //没有可以执行的事件,退出 69 /* If we have no events, we just exit */ 70 if (!event_haveevents(base)) { 71 event_debug(("%s: no events registered.", __func__)); 72 return (1); 73 } 74 75 /* update last old time */ 76 //更新base的创建时间 77 gettime(base, &base->event_tv); 78 79 /* clear time cache */ 80 //清缓存 81 base->tv_cache.tv_sec = 0; 82 83 //进行对应事件的分发,将tv_p也传入进去,tv_p为超时时间 84 res = evsel->dispatch(base, evbase, tv_p); 85 86 if (res == -1) 87 return (-1); 88 //来事件了 89 //更新缓存时间 90 gettime(base, &base->tv_cache); 91 92 //进行超时处理,处理目前时间已经到达需要执行的事件,加入活动队列等操作 93 timeout_process(base); 94 95 //有活动队列 96 if (base->event_count_active) { 97 //调用 98 event_process_active(base); 99 //全部执行完,并且只要执行一次,就可以跳出循环了 100 if (!base->event_count_active && (flags & EVLOOP_ONCE)) 101 done = 1; 102 } else if (flags & EVLOOP_NONBLOCK) 103 //活动队列没有事件,而且是非阻塞,跳出循环 104 done = 1; 105 } 106 107 /* clear time cache */ 108 base->tv_cache.tv_sec = 0; 109 110 event_debug(("%s: asked to terminate loop.", __func__)); 111 return (0); 112 }6.timeout_next
1 //查找下一个需要处理的事件,这边需要指针的指针,因为假如小根堆中压根没有事件,将指针置为空 2 static int 3 timeout_next(struct event_base *base, struct timeval **tv_p) 4 { 5 struct timeval now; 6 struct event *ev; 7 struct timeval *tv = *tv_p; 8 //查找小根堆里面的事件最小的事件,没有就退出 9 if ((ev = min_heap_top(&base->timeheap)) == NULL) { 10 /* if no time-based events are active wait for I/O */ 11 //没有事件了,超时时间置为空,退出,时间指针置为空,所以需要指针的指针 12 *tv_p = NULL; 13 return (0); 14 } 15 16 if (gettime(base, &now) == -1) 17 return (-1); 18 //事件已经超时,需要立即执行,清空tv_p,超时时间为0,返回 19 if (evutil_timercmp(&ev->ev_timeout, &now, <=)) { 20 evutil_timerclear(tv); 21 return (0); 22 } 23 //事件还没有到执行的时间,计算出相差的时间,返回 24 evutil_timersub(&ev->ev_timeout, &now, tv); 25 26 assert(tv->tv_sec >= 0); 27 assert(tv->tv_usec >= 0); 28 29 event_debug(("timeout_next: in %ld seconds", tv->tv_sec)); 30 return (0); 31 }7.timeout_process
1 //进行时间处理 2 void 3 timeout_process(struct event_base *base) 4 { 5 struct timeval now; 6 struct event *ev; 7 //时间堆为空退出 8 if (min_heap_empty(&base->timeheap)) 9 return; 10 11 gettime(base, &now); 12 13 //事件执行时间比现在大时,需要执行,将此事件从event队列中删除 14 while ((ev = min_heap_top(&base->timeheap))) { 15 if (evutil_timercmp(&ev->ev_timeout, &now, >)) 16 break; 17 18 /* delete this event from the I/O queues */ 19 //从ev对应的队列中删除此事件 20 event_del(ev); 21 22 event_debug(("timeout_process: call %p", 23 ev->ev_callback)); 24 //发送到活动队列,激活此事件,事件的状态变更为EV_TIMEOUT,事件的执行次数改为1 25 event_active(ev, EV_TIMEOUT, 1); 26 } 27 }8.event_process_active
1 //对在活动队列中的事件调用他对应的回调 2 static void 3 event_process_active(struct event_base *base) 4 { 5 struct event *ev; 6 struct event_list *activeq = NULL; 7 int i; 8 short ncalls; 9 10 //取得第一个非空的优先级队列,nactivequeues越小,优先级越高 11 for (i = 0; i < base->nactivequeues; ++i) { 12 if (TAILQ_FIRST(base->activequeues[i]) != NULL) { 13 activeq = base->activequeues[i]; 14 break; 15 } 16 } 17 18 assert(activeq != NULL); 19 20 for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) { 21 //如果是持续事件,只从EVLIST_ACTIVE队列中删除事件即可 22 if (ev->ev_events & EV_PERSIST) 23 event_queue_remove(base, ev, EVLIST_ACTIVE); 24 else 25 event_del(ev); 26 27 /* Allows deletes to work */ 28 //允许删除自己 29 ncalls = ev->ev_ncalls; 30 ev->ev_pncalls = &ncalls; 31 while (ncalls) { 32 //持续调用,直到调用次数为0 33 ncalls--; 34 ev->ev_ncalls = ncalls; 35 (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg); 36 if (event_gotsig || base->event_break) 37 return; 38 } 39 } 40 }