欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

最小堆的实现及应用-----定时器(3):时间堆

程序员文章站 2022-06-09 16:47:18
...

基于升序链表的定时器和时间轮都是以固定的频率来调用心搏函数,并在其中依次检测到期的定时器,然后执行到期定时器上的回调函数,这样的定时并不准确,因为可能已有定时器到期了,但是因为心搏函数此时还未调用而无法处理定时任务。

设计定时器的另一种思路是:将所有定时器中超时时间最小的一个定时器的超时值作为心搏间隔。这样,一旦心博函数tick被调用,超时时间最小的定时器必然到期,我们就可以在tick函数中处理该定时器。然后,再次从剩余的定时器中找出超时值最小的一个,并将这个设置为下一心博间隔。如此反复,就可以实现较为精准的定时,最小堆这种数据结构无疑是非常适合这种方案的

最小堆:是指每个节点的值都小于或等于其子节点的值的完全二叉树。如下图所示:
最小堆的实现及应用-----定时器(3):时间堆
对于最小堆的操作无非就是插入和删除节点,对于插入操作来说,我们要将X元素插入最小堆的话,可以在树的最后一个节点后面建立一个空穴节点。如果X可以放入空穴中而不破坏最小堆的性质,则插入完成。否则要进行上虑操作,即交换空穴和父节点上的元素。不断重复上述过程,直到X可以被放入到空穴中,则插入操作完成。比如我们要为上图所示的最小堆插入一个值为13的元素,则插入过程如下图所示:
最小堆的实现及应用-----定时器(3):时间堆
删除操作:我们先以删除堆顶(根节点)为例说明,我们可以将要删除的根节点看做空穴,然后将最后一个元素X放入空穴中,若不破坏最小堆的性质,则删除成功。否则做下滤操作,即交换空穴与其儿子节点中的较小者上的元素。不断重复上述过程,直到X可以被放入到空穴中,则删除操作完成。那么对于一般的节点我们如何删除呢?很简单,我们将被删除的节点看做空穴,将最后一个元素放入其中,先进行下虑操作,保证以该空穴为根的子树满足最小堆的性质,然后再进行上虑操作。


代码实现:

由于最小堆是一种完全二叉树,所以我们可以用数组来组织其中的元素。对于数组中的任意一个位置ii上的元素,其左儿子节点在位置2i+12i+1上,其右儿子节点在位置2i+22i+2上,其父结点则在位置(i1)/2(i-1)/2上。对于图一所示的最小堆可以用下图所示的数组来表示:
最小堆的实现及应用-----定时器(3):时间堆
既然最小堆可以用数组来描述,那给一个数组,我们如何将这个数组初始化为最小堆呢?我们只需要对数字中第(N1)/2(N-1)/2个元素到第00个元素执行下虑操作,就可以确保该数组构成一个最小堆。这是因为对于包含N个元素的完全二叉树而言,它具有(N1)/2(N-1)/2个非叶子节点,这些非叶子节点正是该完全二叉树的第00(N1)/2(N-1)/2个节点。我们只需要确保这些非叶子节点构成的子树都具有最小堆的性质,则整个树就具有最小堆的性质。

下面代码中的时间堆类是一个最小堆的实现,包含创建最小堆,初始化数组为最小堆,插入,删除,判断最小堆是否为空,获取堆顶元素,最小堆扩容,销毁最小堆等操作,并联合server.cpp展示了最小堆在服务器上通过定时事件来处理非活动客户端连接的应用-----时间堆。

#ifndef _TIME_HEAP_H
#define _TIME_HEAP_H
#include<iostream>
#include<stdio.h>
#include<netinet/in.h>
#include<time.h>
using std::exception;
class heap_timer;
//用户数据结构
struct client_data
{
	sockaddr_in address;//客户端socket地址
	int sockfd;			//socket文件描述符
	char buf[BUFSIZ];	//读缓存
	heap_timer *timer;	//定时器
};
//定时器类
class heap_timer
{
	public:
		time_t expire;	/*定时器生效的绝对时间*/
		void (*function) (client_data*);//任务回调函数
		client_data* user_data;	//用户数据
	public:
		heap_timer(int delay)
		{
			expire = time(NULL) + delay;
		}
};
/*时间堆类*/
class time_heap
{
	private:
		heap_timer** heap;/*堆数组*/
		int capacity;/*堆数组的容量*/
		int cur_size;/*元素个数*/
		/*最小堆下滤操作,确保堆数组中以第node个节点作为根的子树拥有最小堆性质*/
		void down(int node)
		{
			heap_timer* tmp = heap[node];
			int child;
			for(;(node*2+1) < cur_size;node = child)
			{
				child = node*2+1;
				if((child+1 < cur_size) && (heap[child+1]->expire < heap[child]->expire))
				{
					child++;
				}
				if(heap[child]->expire < tmp->expire)
				{
					heap[node] = heap[child];
				}
				else break;
			}
			heap[node] = tmp;
		}
		/*将堆数组容量扩大1倍*/
		void resize()
		{
			heap_timer** temp = new heap_timer*[2*capacity];
			if(temp == NULL)
				throw std::exception();
			for(int i = 0;i < 2*capacity;i++)
			{
				if(i < cur_size)
				{
					temp[i] = heap[i];
				}
				else temp[i] = NULL;
			}
			delete [] heap;
			heap = temp;
			capacity *= 2;
		}
	public:
		time_heap(int capacity):capacity(capacity),cur_size(0)
		{
			heap = new heap_timer*[capacity];
			if(heap == NULL)
				throw std::exception();
			for(int i = 0;i < capacity;i++)
				heap[i] = NULL;
		}
		time_heap(heap_timer** temp,int size,int capacity):capacity(capacity),cur_size(size)
		{
			if(size > capacity)
				throw std::exception();
			heap = new heap_timer*[capacity];
			if(heap == NULL)
				throw std::exception();
			for(int i = 0;i < capacity;i++)
			{
				if(i < size)
					heap[i] = temp[i];
				else heap[i] = NULL;
			}
			if(size > 0)
			{
				/*对非叶子节点进行下滤操作*/
				for(int i = (cur_size - 2)/2;i >= 0;i--)
					down(i);
			}
		}
		~time_heap()
		{
			for(int i = 0;i < cur_size;i++)
				delete heap[i];
			delete[]heap;
		}
		/*添加定时器*/
		void add_timer(heap_timer* timer)
		{
			if(timer == NULL)
				return;
			if(cur_size >= capacity)/*若空间不够,则扩大1倍*/
				resize();
			/*新插入一个元素,当前堆大小加1,node是新建空穴的位置*/
			int node = cur_size++;
			int parent;
			/*对从空穴到根节点的路径上的所有节点执行上虑操作*/
			for(;node > 0;node = parent)
			{
				parent = (node-1)/2;
				if(timer->expire >= heap[parent]->expire)
					break;
				else heap[node] = heap[parent];
			}
			heap[node] = timer;
		}
		/*查看堆是否为空*/
		bool empty()
		{
			return cur_size == 0;
		}
		/*获得堆顶*/
		heap_timer* top()
		{
			if(empty())
				throw std::exception();
			return heap[0];
		}
		/*删除堆顶定时器*/
		void pop()
		{
			if(empty())
			{
				throw std::exception();
			}
			delete heap[0];
			heap[0] = heap[cur_size-1];
			cur_size--;
			down(0);
		}
		/*删除目标定时器*/
		void del_timer(heap_timer* timer)
		{
			if(timer == NULL)
				return;
			/*将定时器对应的回调函数指向空,这样起到延迟删除的作用,节省了真正删除定时器的开销,但是会容易使堆数组膨胀*/
			timer->function = NULL;
		}
		/*调整目标定时器*/
		void adjust_timer(heap_timer* timer,time_t timeout)
		{
			if(timer == NULL)
				return;
			timer->expire = timeout;
			down(0);
		}
		/*心博函数*/
		void tick()
		{
			time_t timeout = time(NULL);
			while(!empty())
			{
				/*如果堆顶没到期则退出循环*/
				if(heap[0]->expire > timeout)
					break;
				/*否则执行堆顶定时器中的任务*/
				if(heap[0]->function != NULL)
					heap[0]->function(heap[0]->user_data);
				/*将堆顶删除*/
				pop();
			}
		}
};
#endif

server.cpp
#include<unistd.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<string.h>
#include<stdlib.h>
#include<fcntl.h>
#include<errno.h>
#include<signal.h>
#include<assert.h>
#include"time_heap.h"
#define TIMEOUT 5
#define MAX_EVENT_NUMBER 1024
static int epollfd;
static time_heap min_heap(1024);
static int pipefd[2];
void set_nonblock(int fd)
{
	int flag = fcntl(fd,F_GETFL);
	flag |= O_NONBLOCK;
	fcntl(fd,F_SETFL,flag);
}
void addfd(int fd,bool flag)
{
	epoll_event event;
	event.data.fd = fd;
	event.events = EPOLLIN;
	if(flag)
		event.events |= EPOLLET;
	epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
	set_nonblock(fd);
}
void sig_handler(int sig)
{
	send(pipefd[1],(char*)&sig,1,0);
}
void set_sig(int sig)
{
	struct sigaction sa;
	sa.sa_handler = sig_handler;
	sa.sa_flags |= SA_RESTART;
	sigfillset(&sa.sa_mask);
	sigaction(sig,&sa,NULL);
}
/*定时器回调函数,删除非活动连接socket上的注册事件,并关闭连接*/
void cb_function(client_data* user)
{
	epoll_ctl(epollfd,EPOLL_CTL_DEL,user->sockfd,NULL);
	close(user->sockfd);
	printf("close fd %d\n",user->sockfd);
}
void timer_handler()
{
	min_heap.tick();//定时处理任务
	/*获得堆顶的定时器,以它的定时时间设置alarm*/
	if(!min_heap.empty())
	{
		heap_timer* temp = min_heap.top();
		alarm(temp->expire-time(NULL));//重新定时,以不断触发sigalarm信号
	}
}
int main(int argc,const char* argv[])
{
	if(argc < 3)
	{
		printf("usage: %s ip_address port_number\n",argv[0]);
		exit(1);
	}
	 const char* ip = argv[1];
	 int port = atoi(argv[2]);

	 int listenfd = socket(AF_INET,SOCK_STREAM,0);

	 struct sockaddr_in listenaddr,connaddr;
	 socklen_t connaddrlen = sizeof(connaddr);
	 listenaddr.sin_family = AF_INET;
	 listenaddr.sin_port = htons(port);
	 inet_pton(AF_INET,ip,&listenaddr.sin_addr);
	 
	 bind(listenfd,(struct sockaddr*)&listenaddr,sizeof(listenaddr));

	 listen(listenfd,5);

	 epoll_event events[MAX_EVENT_NUMBER];
	 epollfd = epoll_create(3);
	 addfd(listenfd,false);//LT模式加入epoll中

	 int ret = socketpair(AF_UNIX,SOCK_STREAM,0,pipefd);
	 assert(ret != -1);
	 addfd(pipefd[0],true);

	 set_sig(SIGALRM);
	 set_sig(SIGTERM);
	 client_data *users = new client_data[MAX_EVENT_NUMBER];
	 bool stop_server = false;
	 bool timeout= false;
	 while(!stop_server)
	 {
		 int num = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
		 if(num == -1)
		 {
			 if(errno == EINTR)
				 continue;
			 perror("epoll_wait:");
			 break;
		 }
		 for(int i = 0;i < num;i++)
		 {
			 int sockfd = events[i].data.fd;
			 /*处理新到的客户端连接*/
			 if(sockfd == listenfd)
			 {
				 memset(&connaddr,'\0',sizeof(connaddr));
				 int connfd = accept(listenfd,(struct sockaddr*)&connaddr,&connaddrlen);
				 addfd(connfd,true);//ET模式加入epoll中

				 printf("get a client,sconnfd = %d\n",connfd);
				 users[connfd].address = connaddr;
				 users[connfd].sockfd = connfd;
				 heap_timer* timer = new heap_timer(2*TIMEOUT);
				 timer->function = cb_function;
				 timer->user_data = &users[connfd];
				 users[connfd].timer = timer;
				 /*若该定时器为时间堆中第一个定时器,则设置alarm*/
				 if(min_heap.empty())
				 {
					 alarm(2*TIMEOUT);
				 }

				 min_heap.add_timer(timer);
			 }
			 /*处理信号*/
			 else if(sockfd == pipefd[0])
			 {
				 int ret = 0;
				 char signals[BUFSIZ];
				 while(1)
				 {
					 memset(signals,'\0',sizeof(signals));
					 ret = recv(sockfd,signals,sizeof(signals),0);
					 if(ret == -1)
					 {
						 if(errno != EAGAIN)
						 {
							 perror("recv signal error:");
						 }
						 break;
					 }
					 for(int i = 0;i < ret;i++)
					 {
						 switch(signals[i])
						 {
							 case SIGALRM:
							 {
								 /*不立即处理定时任务,先用timeout标记有定时
								   任务需要处理,优先处理I/O任务*/
								 timeout = true;
								 break;
							 }
							 case SIGTERM:
							 {
								 stop_server = true;
								 break;
							 }
						 }
					 }
				 }
			 }
			 else if(events[i].events & EPOLLIN){
				 while(true)
				 {
					 memset(users[sockfd].buf,'\0',sizeof(users[sockfd].buf));
					 int ret = recv(sockfd,users[sockfd].buf,sizeof(users[sockfd].buf),0);
					 if(ret == -1)
					 {
						 if(errno != EAGAIN)
						 {
							 /*如果发生错误,则关闭连接,移除定时任务*/
							 cb_function(&users[sockfd]);
							 min_heap.del_timer(users[sockfd].timer);
						 }
						 break;
					 }
					 else if(ret == 0)
					 {
						 /*若对端关闭,则关闭连接,移除定时任务*/
						 cb_function(&users[sockfd]);
						 min_heap.del_timer(users[sockfd].timer);
						 break;
					 }
					 send(sockfd,users[sockfd].buf,strlen(users[sockfd].buf),0);
				 }
				 if(users[sockfd].timer != NULL && users[sockfd].timer->function != NULL)
				 {
					 /*如果某个客户端有数据可读,则调整该连接的定时器,以延迟
					   该连接关闭的事件*/
					 time_t timeout = time(NULL) + (2*TIMEOUT);
					 printf("adjust timer once\n");
					 min_heap.adjust_timer(users[sockfd].timer,timeout);
				 }
			 }
		 }
		 if(timeout)
		 {
			 timer_handler();
			 timeout = false;
		 }
	 }
	 close(epollfd);
	 close(listenfd);
	 close(pipefd[0]);
	 close(pipefd[1]);
	 delete [] users;
	return 0;
}





参考:Linux高性能服务器编程 游双