欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

从零编写c++之http服务器(2)-epoll异步事件驱动框架

程序员文章站 2022-03-10 10:21:07
...

       epoll是select/poll基础上改进的为大规模描述符事件监测的机制,常出现在高性能,高并发的服务器设计中。在这里我们需要设计一个框架,实现套接字对象将自身注册到框架中,框架即可利用epoll对其套接字进行事件监测;当事件产生时通知相应的套接字对象。从而实现事件的监测与处理解耦。惯例还是献上类图。

完整源码见<https://github.com/kwansoner/panda.git>

                          从零编写c++之http服务器(2)-epoll异步事件驱动框架

       epoll事件中心设计为观察者模式。基类IEventHandle提供了一些protected的handle_xxx方法,分时是对应事件的通知回调。所有需要注册进入事件中心的套接字对象只需继承此基类即可。在这里handle_xxx设计为protected是由于此方法对于对象而言并非对外提供的接口,因此不可为public。

       基类IEvent提供有两个方法,register_event将套接字注册进入事件中心,shutdown_event关闭事件。当套接字对象套接字调用shutdown_event后,会产生close事件。事件中心会等待该套接字对象其它回调结束后,将套接字从事件中心注销,再调用handle_close接口; 堆上的套接字对象在这里释放自己。

class INetObserver
{
	friend class CNetObserver;
	public:
		virtual ~INetObserver(){};
		
	protected:
		// desc: 读事件回调函数
		// param: /套接字描述符
		// return: void
		virtual void handle_in(int) = 0;

		// desc: 写事件回调函数
		// param: /套接字描述符
		// return: void		
		virtual void handle_out(int) = 0;

		// desc: 关闭事件回调函数
		// param: /套接字描述符
		// return: void
		virtual void handle_close(int) = 0;

		// desc: 错误事件回调函数
		// param: /套接字描述符
		// return: void		
		virtual void handle_error(int) = 0;
};

// 套接字继承于IEventHandle, 注册进入事件中心, 从而获得事件通知
// 堆上的对象只能在handle_close中释放自己
class IEventHandle: public INetObserver
{
	public:
		// desc: 注册进入事件中心
		// param: fd/套接字描述符 type/事件类型
		// return: 0/成功 -1/失败	
		int register_event(int fd, EventType type = EDEFULT);

		// desc: 注册进入事件中心
		// param: socket/套接字对象 type/事件类型
		// return: 0/成功 -1/失败	
		int register_event(Socket::ISocket &socket, EventType type = EDEFULT);

		// desc: 关闭事件
		// param: fd/套接字描述符
		// return: 0/成功 -1/失败	
		int shutdown_event(int fd);

		// desc: 关闭事件
		// param: socket/套接字对象
		// return: 0/成功 -1/失败	
		int shutdown_event(Socket::ISocket &);
};

class IEvent
{		
	public:
		virtual ~IEvent(){};

		// desc: 注册进入事件中心
		// param: fd/套接字描述符 type/事件类型
		// return: 0/成功 -1/失败	
		virtual int register_event(int, IEventHandle *, EventType) = 0;

		// desc: 关闭事件
		// param: fd/套接字描述符
		// return: 0/成功 -1/失败	
		virtual int shutdown_event(int) = 0;
};

        我们先看注册接口,我们先查一下记录注册套接字对象的map,然后记录此次注册。根据是修改还是新增正确传入epoll_ctl的操作类型参数。这里注意一下不能调用record记录注册信息前,调用epoll_ctl。假设epoll_ctl先于record调用,存在以下情况,也就是client连接后立即发送请求数据; 这时我们利用epoll_ctl添加进入后但是还没调用record记录时却已经检测到事件,于是查询不到套接字注册记录事件被丢弃。

       同时这里套接字注册事件类型会默认加入EPOLLET,也就是边缘触发。边缘触发有助于减少重复的事件通知,但是也加大了编程难度。譬如刚举例的情况,事件丢失后就真的丢失了;对于下次请求依赖于上次回复的时候更是灾难。

int CEvent::register_event(int fd, IEventHandle *handle, EventType type)
{
	if(INVALID_FD(fd) || INVALID_FD(m_epollfd) || INVALID_POINTER(handle)){
		seterrno(EINVAL);
		return -1;
	}

	struct epoll_event newevent;
	newevent.data.fd = fd;
	newevent.events = type;
	
	ExistRet ret = isexist(fd, type, handle);
	if(ret == Existed)
		return 0;

	/*
	* epoll_ctl先执行会出现注册后立即产生事件
	* 但是此时未执行到record记录导致丢失事件的问题
	*/
	record(fd, type, handle);
	if(ret == HandleModify)
		return 0;

	int opt;
	if(ret == TypeModify || ret == Modify)
		opt = EPOLL_CTL_MOD;
	else if(ret == NotExist)
		opt = EPOLL_CTL_ADD;

	if(epoll_ctl(m_epollfd, opt, fd, &newevent) < 0){
		errsys("epoll op %d, fd %#x error\n", opt, fd);
		detach(fd, true);
		return -1;	
	}
	
	return 0;
}
int CEvent::unregister_event(int fd)
{
	if(epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, NULL) < 0){
		errsys("epoll delete fd %d failed\n", fd);
		return -1;
	}
	
	return detach(fd);
}
int CEvent::shutdown_event(int fd)
{
	trace("sock[%#X] shutdown event\n", fd);
	return ::shutdown(fd, SHUT_WR);
}

        事件中心创建一条线程eventwait_thread,在这里调用epoll_wait等待事件。需要注意的是epoll_wait这种可能长时间睡眠的接口存在被系统中断返回EINTR错误的情况,我们忽略再次等待即可。事件发生后将事件放入队列并向线程池投入任务。

        线程池执行threadhandle接口,首先我们取出任务,取出任务同时会增加引用计数。然后获取套接字对象并回调其对应的接口,再递减引用计数。close事件时会递减两次,因为引用计数初始值为1。那么此时无论多少个线程在调用同个套接字的handle_xxx接口,最后一个处理完成的走到下面subref_and_test调用就会成立(引用计数减为零)。从而此时真正通知对象关闭,并注销套接字对象。引入引用计数正是为了解决其他线程在调用着handle_xxx接口,但此时连接关闭,立即释放套接字对象会引起的死机问题。

       同时引用计数减为零时处理也得考虑,unregister_event必须先于handle_close执行。否则将存在以下情形,连接A先执行handle_close关闭了套接字fd,但是此时仍未调用unregister_event将fd从epoll中注销。于是新的连接B过来得到的描述符会出现于等于刚关闭的描述符fd的情况,新的连接B尝试调用register_event发现记录已经存在,直接返回。然后连接A执行到unregister_event时发现fd是已经关闭了的又会出错。

void CEvent::threadhandle()
{
	int fd = 0x00;
	EventType events;
	if(poptask(fd, events) < 0){
		return;
	}
	CNetObserver *observer = get_observer(fd);
	if(observer == NULL)
		return;

	/*
	* 关闭时递减引用计数。在对象的所有回调处理完时真正释放
	*/
	if(events & ECLOSE){
		cleartask(fd);		
		observer->subref();	
		
	}else{	
		if(events & EERR){
			observer->handle_error(fd);
		}
		if(events & EIN){
			observer->handle_in(fd);
		}
		if(events & EOUT){
			observer->handle_out(fd);
		}
		
	}	
	
	/*
	* unregister_event 执行后于handle_close将会出现当前套接字关闭后
	* 在仍未执行完unregister_event时新的连接过来,得到一样的描述符
	* 新的连接调用register_event却未注册进入epoll。同时han_close中
	* 关闭了套接字,unregister_event中epoll删除关闭的套接字报错
	*/
	if(observer->subref_and_test()){
		unregister_event(fd);
		observer->handle_close(fd);	
		observer->selfrelease();
	}	
}
void *CEvent::eventwait_thread(void *arg)
{
	CEvent &cevent = *(CEvent *)(arg);
	if(INVALID_FD(cevent.m_epollfd)){
		seterrno(EINVAL);
		pthread_exit(NULL);
	}

	for(;;){
		int nevent = epoll_wait(cevent.m_epollfd, &cevent.m_eventbuff[0], EventBuffLen, -1);
		if(nevent < 0 && errno != EINTR){
			errsys("epoll wait error\n");
			break;
		}

		for(int i = 0; i < nevent; i++){

			int fd = cevent.m_eventbuff[i].data.fd;
			EventType events = static_cast<EventType>(cevent.m_eventbuff[i].events);
			
			if(cevent.pushtask(fd, events) == 0x00){
				cevent.m_ithreadpool->pushtask(&cevent);
			}
		}
	}

	pthread_exit(NULL);
}