从零编写c++之http服务器(2)-epoll异步事件驱动框架
epoll是select/poll基础上改进的为大规模描述符事件监测的机制,常出现在高性能,高并发的服务器设计中。在这里我们需要设计一个框架,实现套接字对象将自身注册到框架中,框架即可利用epoll对其套接字进行事件监测;当事件产生时通知相应的套接字对象。从而实现事件的监测与处理解耦。惯例还是献上类图。
完整源码见<https://github.com/kwansoner/panda.git>
epoll事件中心设计为观察者模式。基类IEventHandle提供了一些protected的handle_xxx方法,分时是对应事件的通知回调。所有需要注册进入事件中心的套接字对象只需继承此基类即可。在这里handle_xxx设计为protected是由于此方法对于对象而言并非对外提供的接口,因此不可为public。
基类IEvent提供有两个方法,register_event将套接字注册进入事件中心,shutdown_event关闭事件。当套接字对象套接字调用shutdown_event后,会产生close事件。事件中心会等待该套接字对象其它回调结束后,将套接字从事件中心注销,再调用handle_close接口; 堆上的套接字对象在这里释放自己。
class INetObserver
{
friend class CNetObserver;
public:
virtual ~INetObserver(){};
protected:
// desc: 读事件回调函数
// param: /套接字描述符
// return: void
virtual void handle_in(int) = 0;
// desc: 写事件回调函数
// param: /套接字描述符
// return: void
virtual void handle_out(int) = 0;
// desc: 关闭事件回调函数
// param: /套接字描述符
// return: void
virtual void handle_close(int) = 0;
// desc: 错误事件回调函数
// param: /套接字描述符
// return: void
virtual void handle_error(int) = 0;
};
// 套接字继承于IEventHandle, 注册进入事件中心, 从而获得事件通知
// 堆上的对象只能在handle_close中释放自己
class IEventHandle: public INetObserver
{
public:
// desc: 注册进入事件中心
// param: fd/套接字描述符 type/事件类型
// return: 0/成功 -1/失败
int register_event(int fd, EventType type = EDEFULT);
// desc: 注册进入事件中心
// param: socket/套接字对象 type/事件类型
// return: 0/成功 -1/失败
int register_event(Socket::ISocket &socket, EventType type = EDEFULT);
// desc: 关闭事件
// param: fd/套接字描述符
// return: 0/成功 -1/失败
int shutdown_event(int fd);
// desc: 关闭事件
// param: socket/套接字对象
// return: 0/成功 -1/失败
int shutdown_event(Socket::ISocket &);
};
class IEvent
{
public:
virtual ~IEvent(){};
// desc: 注册进入事件中心
// param: fd/套接字描述符 type/事件类型
// return: 0/成功 -1/失败
virtual int register_event(int, IEventHandle *, EventType) = 0;
// desc: 关闭事件
// param: fd/套接字描述符
// return: 0/成功 -1/失败
virtual int shutdown_event(int) = 0;
};
我们先看注册接口,我们先查一下记录注册套接字对象的map,然后记录此次注册。根据是修改还是新增正确传入epoll_ctl的操作类型参数。这里注意一下不能调用record记录注册信息前,调用epoll_ctl。假设epoll_ctl先于record调用,存在以下情况,也就是client连接后立即发送请求数据; 这时我们利用epoll_ctl添加进入后但是还没调用record记录时却已经检测到事件,于是查询不到套接字注册记录事件被丢弃。
同时这里套接字注册事件类型会默认加入EPOLLET,也就是边缘触发。边缘触发有助于减少重复的事件通知,但是也加大了编程难度。譬如刚举例的情况,事件丢失后就真的丢失了;对于下次请求依赖于上次回复的时候更是灾难。
int CEvent::register_event(int fd, IEventHandle *handle, EventType type)
{
if(INVALID_FD(fd) || INVALID_FD(m_epollfd) || INVALID_POINTER(handle)){
seterrno(EINVAL);
return -1;
}
struct epoll_event newevent;
newevent.data.fd = fd;
newevent.events = type;
ExistRet ret = isexist(fd, type, handle);
if(ret == Existed)
return 0;
/*
* epoll_ctl先执行会出现注册后立即产生事件
* 但是此时未执行到record记录导致丢失事件的问题
*/
record(fd, type, handle);
if(ret == HandleModify)
return 0;
int opt;
if(ret == TypeModify || ret == Modify)
opt = EPOLL_CTL_MOD;
else if(ret == NotExist)
opt = EPOLL_CTL_ADD;
if(epoll_ctl(m_epollfd, opt, fd, &newevent) < 0){
errsys("epoll op %d, fd %#x error\n", opt, fd);
detach(fd, true);
return -1;
}
return 0;
}
int CEvent::unregister_event(int fd)
{
if(epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, NULL) < 0){
errsys("epoll delete fd %d failed\n", fd);
return -1;
}
return detach(fd);
}
int CEvent::shutdown_event(int fd)
{
trace("sock[%#X] shutdown event\n", fd);
return ::shutdown(fd, SHUT_WR);
}
事件中心创建一条线程eventwait_thread,在这里调用epoll_wait等待事件。需要注意的是epoll_wait这种可能长时间睡眠的接口存在被系统中断返回EINTR错误的情况,我们忽略再次等待即可。事件发生后将事件放入队列并向线程池投入任务。
线程池执行threadhandle接口,首先我们取出任务,取出任务同时会增加引用计数。然后获取套接字对象并回调其对应的接口,再递减引用计数。close事件时会递减两次,因为引用计数初始值为1。那么此时无论多少个线程在调用同个套接字的handle_xxx接口,最后一个处理完成的走到下面subref_and_test调用就会成立(引用计数减为零)。从而此时真正通知对象关闭,并注销套接字对象。引入引用计数正是为了解决其他线程在调用着handle_xxx接口,但此时连接关闭,立即释放套接字对象会引起的死机问题。
同时引用计数减为零时处理也得考虑,unregister_event必须先于handle_close执行。否则将存在以下情形,连接A先执行handle_close关闭了套接字fd,但是此时仍未调用unregister_event将fd从epoll中注销。于是新的连接B过来得到的描述符会出现于等于刚关闭的描述符fd的情况,新的连接B尝试调用register_event发现记录已经存在,直接返回。然后连接A执行到unregister_event时发现fd是已经关闭了的又会出错。
void CEvent::threadhandle()
{
int fd = 0x00;
EventType events;
if(poptask(fd, events) < 0){
return;
}
CNetObserver *observer = get_observer(fd);
if(observer == NULL)
return;
/*
* 关闭时递减引用计数。在对象的所有回调处理完时真正释放
*/
if(events & ECLOSE){
cleartask(fd);
observer->subref();
}else{
if(events & EERR){
observer->handle_error(fd);
}
if(events & EIN){
observer->handle_in(fd);
}
if(events & EOUT){
observer->handle_out(fd);
}
}
/*
* unregister_event 执行后于handle_close将会出现当前套接字关闭后
* 在仍未执行完unregister_event时新的连接过来,得到一样的描述符
* 新的连接调用register_event却未注册进入epoll。同时han_close中
* 关闭了套接字,unregister_event中epoll删除关闭的套接字报错
*/
if(observer->subref_and_test()){
unregister_event(fd);
observer->handle_close(fd);
observer->selfrelease();
}
}
void *CEvent::eventwait_thread(void *arg)
{
CEvent &cevent = *(CEvent *)(arg);
if(INVALID_FD(cevent.m_epollfd)){
seterrno(EINVAL);
pthread_exit(NULL);
}
for(;;){
int nevent = epoll_wait(cevent.m_epollfd, &cevent.m_eventbuff[0], EventBuffLen, -1);
if(nevent < 0 && errno != EINTR){
errsys("epoll wait error\n");
break;
}
for(int i = 0; i < nevent; i++){
int fd = cevent.m_eventbuff[i].data.fd;
EventType events = static_cast<EventType>(cevent.m_eventbuff[i].events);
if(cevent.pushtask(fd, events) == 0x00){
cevent.m_ithreadpool->pushtask(&cevent);
}
}
}
pthread_exit(NULL);
}
上一篇: c语言大于等于怎么打?