一个基于C++11的定时器队列(timerfd,poll实现)
目录
@
前言
最近小程序要用到定时器,找了一圈也没找到合适的,最后还是绕回来选择了muduo里面的timerqueue,整理了下它的代码,独立了出来,因为实在懒得从头写一个- -!。
原来的muduo中timerqueue是专为eventloop提供定时功能的组件,我在笔记
muduo网络库学习笔记(三)timerqueue定时器队列中解读过muduo这块代码,现在反过来,eventloop做为timerqueue的组件,timerqueue启动后在后面开一个线程跑eventloop,eventloop里面进行阻塞的poll循环,只监听timerfd,和eventfd,从而独立出来一个单独的定时器队列。。
优点
[async] [thread-safe] [based on poll] [microseconds-level]
异步
:后台线程监视文件描述符动态。线程安全
: 多线程安全的 支持异步插入定时器。基于poll
: 非休眠机制实现。级别
: 微妙级别。
test
#include <chrono> #include <iostream> #include "logger.hpp" #include "timerqueue.hpp" void test() { log_debug << "[test] : test timerque happended "; std::cout << "[test] : test timerque happended at " << std::chrono::system_clock::now().time_since_epoch() / std::chrono::microseconds(1) << std::endl; } int main() { //logger::setloglevel(logger::trace); timerqueue* timer_queue = timerqueue::getinstance(); timer_queue->start(); timer_queue->runafter(1.0, test); timer_queue->runafter(1.0, test); timer_queue->runafter(3.0, test); timer_queue->runevery(5.0, test); getchar(); return 0; }
./timer_queue_test
[test] : test timerque happended at 1548293190
811373
[test] : test timerque happended at 1548293190
811392
[test] : test timerque happended at 1548293192
811787
[test] : test timerque happended at 1548293194
811927
[test] : test timerque happended at 1548293199
812081
[test] : test timerque happended at 1548293204
812645
[test] : test timerque happended at 1548293209
813508
源代码
timerqueue
: https://github.com/bethlyrosedaisley/timerqueue/tree/master/timerqueue 欢迎收藏。
hpp
#ifndef _net_timerqueue_hh #define _net_timerqueue_hh #include "timestamp.hpp" #include <stdint.h> #include <set> #include <vector> #include <condition_variable> #include <functional> #include <thread> template <typename t> class atomicintegert { public: atomicintegert() :m_value(0) { } t get() { return __sync_val_compare_and_swap(&m_value, 0, 0); } t incrementandget() { return addandget(1); } t decrementandget() { return addandget(-1); } private: atomicintegert& operator=(const atomicintegert&); atomicintegert(const atomicintegert&); t getandadd(t x) { return __sync_fetch_and_add(&m_value, x); } t addandget(t x) { return getandadd(x) + x; } volatile t m_value; }; typedef atomicintegert<int32_t> atomicint32; typedef atomicintegert<int64_t> atomicint64; class timer{ public: typedef std::function<void()> timercallback_t; timer(const timercallback_t& cb, timestamp when, double interval) :m_callback(cb), m_expiration(when), m_interval(interval), m_repeat(interval > 0.0), m_sequence(s_numcreated.incrementandget()) { } void run() const { m_callback(); } timestamp expiration() const { return m_expiration; } bool repeat() const { return m_repeat; } int64_t sequence() const { return m_sequence; } void restart(timestamp now); static int64_t numcreated(){ return s_numcreated.get(); } private: timer& operator=(const timer&); timer(const timer&); const timercallback_t m_callback; timestamp m_expiration; const double m_interval; const bool m_repeat; const int64_t m_sequence; static atomicint64 s_numcreated; }; /// /// an opaque identifier, for canceling timer. /// class timerid { public: timerid() : m_timer(null), m_sequence(0) { } timerid(timer* timer, int64_t seq) : m_timer(timer), m_sequence(seq) { } // default copy-ctor, dtor and assignment are okay friend class timerqueue; private: //timerid& operator=(const timerid&); //timerid(const timerid&); timer* m_timer; int64_t m_sequence; }; class channel; class eventloop; class timerqueue { private: timerqueue(); public: ~timerqueue(); static timerqueue* getinstance() { static timerqueue instance; return &instance; } typedef std::function<void()> timercallback_t; // schedules the callback to be run at given time, void start(); timerid runat(const timestamp& time, const timercallback_t& cb); timerid runafter(double delay, const timercallback_t& cb); timerid runevery(double interval, const timercallback_t& cb); void cancel(timerid timerid); private: typedef std::pair<timestamp, timer*> entry; typedef std::set<entry> timerlist; typedef std::pair<timer*, int64_t> activetimer; typedef std::set<activetimer> activetimerset; timerid addtimer(const timercallback_t& cb, timestamp when, double interval = 0.0); void addtimerinloop(timer* timer); void cancelinloop(timerid timerid); //called when timerfd alarms void handleread(); //move out all expired timers and return they. std::vector<entry> getexpired(timestamp now); bool insert(timer* timer); void reset(const std::vector<entry>& expired, timestamp now); std::thread m_thread; const int m_timerfd; eventloop* p_loop; channel* p_timerfdchannel; //timer list sorted by expiration timerlist m_timers; activetimerset m_activetimers; bool m_callingexpiredtimers; /*atomic*/ activetimerset m_cancelingtimers; std::condition_variable m_wait_loop_init; }; #endif
cpp
#include <stdint.h> #include <assert.h> #include <sys/timerfd.h> #include <unistd.h> #include <mutex> #include "logger.hpp" #include "channel.hpp" #include "eventloop.hpp" #include "timerqueue.hpp" namespace timerfd { int createtimerfd() { int timerfd = ::timerfd_create(clock_monotonic, tfd_nonblock | tfd_cloexec); log_trace << "createtimerfd() fd : " << timerfd; if (timerfd < 0) { log_sysfatal << "failed in timerfd_create"; } return timerfd; } struct timespec howmuchtimefromnow(timestamp when) { int64_t microseconds = when.microsecondssinceepoch() - timestamp::now().microsecondssinceepoch(); if (microseconds < 100) { microseconds = 100; } struct timespec ts; ts.tv_sec = static_cast<time_t>( microseconds / timestamp::kmicrosecondspersecond); ts.tv_nsec = static_cast<long>( (microseconds % timestamp::kmicrosecondspersecond) * 1000); return ts; } void readtimerfd(int timerfd, timestamp now) { uint64_t howmany; ssize_t n = ::read(timerfd, &howmany, sizeof howmany); log_trace << "timerqueue::handleread() " << howmany << " at " << now.tostring(); if (n != sizeof howmany) { log_error << "timerqueue::handleread() reads " << n << " bytes instead of 8"; } } void resettimerfd(int timerfd, timestamp expiration) { // wake up loop by timerfd_settime() log_trace << "resettimerfd()"; struct itimerspec newvalue; struct itimerspec oldvalue; bzero(&newvalue, sizeof newvalue); bzero(&oldvalue, sizeof oldvalue); newvalue.it_value = howmuchtimefromnow(expiration); int ret = ::timerfd_settime(timerfd, 0, &newvalue, &oldvalue); if (ret) { log_syserr << "timerfd_settime()"; } } }; using namespace timerfd; atomicint64 timer::s_numcreated; void timer::restart(timestamp now) { if(m_repeat) { m_expiration = timestamp::addtime(now, m_interval); } else { m_expiration = timestamp::invalid(); } } timerqueue::timerqueue() :m_timerfd(createtimerfd()), m_callingexpiredtimers(false) { } timerqueue::~timerqueue() { p_timerfdchannel->disableall(); p_timerfdchannel->remove(); p_loop->quit(); m_thread.join(); delete p_loop; delete p_timerfdchannel; ::close(m_timerfd); for (timerlist::iterator it = m_timers.begin(); it != m_timers.end(); ++it) { delete it->second; } } void timerqueue::start() { bool b_inited = false;; m_thread = std::thread([this, &b_inited]()mutable { this->p_loop = new eventloop(); this->p_timerfdchannel = new channel(this->p_loop, this->m_timerfd); this->p_timerfdchannel->setreadcallback(std::bind(&timerqueue::handleread, this)); this->p_timerfdchannel->enablereading(); b_inited = true; this->m_wait_loop_init.notify_all(); this->p_loop->loop(); }); std::mutex mutex; std::unique_lock<std::mutex> lock(mutex); while(!b_inited){ m_wait_loop_init.wait(lock); } } std::vector<timerqueue::entry> timerqueue::getexpired(timestamp now) { std::vector<entry> expired; entry sentry = std::make_pair(now, reinterpret_cast<timer*>uintptr_max); timerlist::iterator it = m_timers.lower_bound(sentry); assert(it == m_timers.end() || now < it->first); std::copy(m_timers.begin(), it, back_inserter(expired)); m_timers.erase(m_timers.begin(), it); for(std::vector<entry>::iterator it = expired.begin(); it != expired.end(); ++it) { activetimer timer(it->second, it->second->sequence()); size_t n = m_activetimers.erase(timer); assert(n == 1); (void)n; } assert(m_timers.size() == m_activetimers.size()); return expired; } timerid timerqueue::addtimer(const timercallback_t& cb, timestamp when, double interval) { timer* timer = new timer(cb, when, interval); p_loop->runinloop(std::bind(&timerqueue::addtimerinloop, this, timer)); return timerid(timer, timer->sequence()); } void timerqueue::addtimerinloop(timer* timer) { p_loop->assertinloopthread(); bool earliestchanged = insert(timer); if (earliestchanged) { resettimerfd(m_timerfd, timer->expiration()); } } void timerqueue::cancel(timerid timerid) { p_loop->runinloop(std::bind(&timerqueue::cancelinloop, this, timerid)); } void timerqueue::cancelinloop(timerid timerid) { p_loop->assertinloopthread(); assert(m_timers.size() == m_activetimers.size()); activetimer timer(timerid.m_timer, timerid.m_sequence); activetimerset::iterator it = m_activetimers.find(timer); if(it != m_activetimers.end()) { size_t n = m_timers.erase(entry(it->first->expiration(), it->first)); assert(n == 1); delete it->first; } else if (m_callingexpiredtimers) { m_cancelingtimers.insert(timer); } assert(m_timers.size() == m_activetimers.size()); } bool timerqueue::insert(timer* timer) { p_loop->assertinloopthread(); assert(m_timers.size() == m_activetimers.size()); bool earliestchanged = false; timestamp when = timer->expiration(); timerlist::iterator it = m_timers.begin(); if (it == m_timers.end() || when < it->first) { earliestchanged = true; } { std::pair<timerlist::iterator, bool> result = m_timers.insert(entry(when, timer)); assert(result.second); (void)result; } { std::pair<activetimerset::iterator, bool> result = m_activetimers.insert(activetimer(timer, timer->sequence())); assert(result.second); (void)result; } log_trace << "timerqueue::insert() " << "m_timers.size() : " << m_timers.size() << " m_activetimers.size() : " << m_activetimers.size(); assert(m_timers.size() == m_activetimers.size()); return earliestchanged; } void timerqueue::handleread() { p_loop->assertinloopthread(); timestamp now(timestamp::now()); readtimerfd(m_timerfd, now); std::vector<entry> expired = getexpired(now); log_trace << "expired timer size " << expired.size() << " "; m_callingexpiredtimers = true; m_cancelingtimers.clear(); for(std::vector<entry>::iterator it = expired.begin(); it != expired.end(); ++it ) { it->second->run(); } m_callingexpiredtimers = false; reset(expired, now); } void timerqueue::reset(const std::vector<entry>& expired, timestamp now) { timestamp nextexpire; for(std::vector<entry>::const_iterator it = expired.begin(); it != expired.end(); ++it) { activetimer timer(it->second, it->second->sequence()); if(it->second->repeat() && m_cancelingtimers.find(timer) == m_cancelingtimers.end()) {//如果是周期定时器则重新设定时间插入. 否则delete. it->second->restart(now); insert(it->second); } else {// fixme move to a free list no delete please delete it->second; } } if (!m_timers.empty()) { nextexpire = m_timers.begin()->second->expiration(); } if (nextexpire.valid()) { resettimerfd(m_timerfd, nextexpire); } } timerid timerqueue::runat(const timestamp& time, const timercallback_t& cb) { return addtimer(cb, time, 0.0); } timerid timerqueue::runafter(double delay, const timercallback_t& cb) { timestamp time(timestamp::addtime(timestamp::now(), delay)); return runat(time, cb); } timerid timerqueue::runevery(double interval, const timercallback_t& cb) { timestamp time(timestamp::addtime(timestamp::now(), interval)); return addtimer(cb, time, interval); }