欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

一个基于C++11的定时器队列(timerfd,poll实现)

程序员文章站 2022-04-15 17:21:23
@[toc] 前言 最近小程序要用到定时器,找了一圈也没找到合适的,最后还是绕回来选择了muduo里面的TimerQueue,整理了下它的代码,独立了出来,因为实在懒得从头写一个 !。 原来的muduo中TimerQueue是专为EventLoop提供定时功能的组件,我在笔记[ muduo网络库学习 ......

目录

@


前言

最近小程序要用到定时器,找了一圈也没找到合适的,最后还是绕回来选择了muduo里面的timerqueue,整理了下它的代码,独立了出来,因为实在懒得从头写一个- -!。

原来的muduo中timerqueue是专为eventloop提供定时功能的组件,我在笔记
muduo网络库学习笔记(三)timerqueue定时器队列
中解读过muduo这块代码,现在反过来,eventloop做为timerqueue的组件,timerqueue启动后在后面开一个线程跑eventloop,eventloop里面进行阻塞的poll循环,只监听timerfd,和eventfd,从而独立出来一个单独的定时器队列。。

优点

[async] [thread-safe] [based on poll] [microseconds-level]

异步 :后台线程监视文件描述符动态。
线程安全 : 多线程安全的 支持异步插入定时器。
基于poll : 非休眠机制实现。
级别 : 微妙级别。

test

#include <chrono>
#include <iostream>
#include "logger.hpp"
#include "timerqueue.hpp"

void test()
{

  log_debug << "[test] : test timerque happended ";

  std::cout << "[test] : test timerque happended at " << std::chrono::system_clock::now().time_since_epoch() / std::chrono::microseconds(1) << std::endl;

}


int main()
{

  //logger::setloglevel(logger::trace);

  timerqueue* timer_queue = timerqueue::getinstance();
  timer_queue->start();
  timer_queue->runafter(1.0, test);
  timer_queue->runafter(1.0, test);
  timer_queue->runafter(3.0, test);

  timer_queue->runevery(5.0, test);
  getchar();
  return 0;
}

./timer_queue_test
[test] : test timerque happended at 1548293190811373
[test] : test timerque happended at 1548293190811392
[test] : test timerque happended at 1548293192811787
[test] : test timerque happended at 1548293194811927
[test] : test timerque happended at 1548293199812081
[test] : test timerque happended at 1548293204812645
[test] : test timerque happended at 1548293209813508

源代码

timerqueue : https://github.com/bethlyrosedaisley/timerqueue/tree/master/timerqueue 欢迎收藏。

hpp

#ifndef _net_timerqueue_hh
#define _net_timerqueue_hh
#include "timestamp.hpp"

#include <stdint.h>
#include <set>
#include <vector>
#include <condition_variable>
#include <functional>
#include <thread>

template <typename t>
class atomicintegert
{
public:
  atomicintegert()
  :m_value(0)
  {

  }

  t get()
  {
    return __sync_val_compare_and_swap(&m_value, 0, 0);
  }

  t incrementandget()
  {
    return addandget(1);
  }

  t decrementandget()
  {
    return addandget(-1);
  }

private:
  atomicintegert& operator=(const atomicintegert&);
  atomicintegert(const atomicintegert&);

  t getandadd(t x)
  {
    return __sync_fetch_and_add(&m_value, x);
  }

  t addandget(t x)
  {
    return getandadd(x) + x;
  }

  volatile t m_value; 

};

typedef atomicintegert<int32_t> atomicint32;
typedef atomicintegert<int64_t> atomicint64;

class timer{
public:
  typedef std::function<void()> timercallback_t;

  timer(const timercallback_t& cb, timestamp when, double interval)
  :m_callback(cb),
  m_expiration(when),
  m_interval(interval),
  m_repeat(interval > 0.0),
  m_sequence(s_numcreated.incrementandget())
{

}

  void run() const
  {
    m_callback();
  }

  timestamp expiration() const { return m_expiration; }
  bool repeat() const { return m_repeat; }
  int64_t sequence() const { return m_sequence; }
  void restart(timestamp now);

  static int64_t numcreated(){ return s_numcreated.get(); }

private:
  timer& operator=(const timer&);
  timer(const timer&);

  const timercallback_t m_callback;
  timestamp m_expiration;
  const double m_interval;
  const bool m_repeat;
  const int64_t m_sequence;

  static atomicint64 s_numcreated;

};

///
/// an opaque identifier, for canceling timer.
///
class timerid
{
 public:
  timerid()
    : m_timer(null),
      m_sequence(0)
  {
  }

  timerid(timer* timer, int64_t seq)
    : m_timer(timer),
      m_sequence(seq)
  {
  }

  // default copy-ctor, dtor and assignment are okay

  friend class timerqueue;

 private:
  //timerid& operator=(const timerid&);
  //timerid(const timerid&);

  timer* m_timer;
  int64_t m_sequence;
};

class channel;
class eventloop;

class timerqueue
{
private:
  timerqueue();
public:
  ~timerqueue();

  static timerqueue* getinstance()
  {
    static timerqueue instance;
    return &instance;
  }

  typedef std::function<void()> timercallback_t;

  // schedules the callback to be run at given time,
  void start();

  timerid runat(const timestamp& time, const timercallback_t& cb);
  timerid runafter(double delay, const timercallback_t& cb);
  timerid runevery(double interval, const timercallback_t& cb);

  void cancel(timerid timerid);

private:
  typedef std::pair<timestamp, timer*> entry;
  typedef std::set<entry> timerlist;
  typedef std::pair<timer*, int64_t> activetimer;
  typedef std::set<activetimer> activetimerset;

  timerid addtimer(const timercallback_t& cb, timestamp when, double interval = 0.0);
  void addtimerinloop(timer* timer);
  void cancelinloop(timerid timerid);
  //called when timerfd alarms
  void handleread();
  //move out all expired timers and return they.
  std::vector<entry> getexpired(timestamp now);
  bool insert(timer* timer);
  void reset(const std::vector<entry>& expired, timestamp now);

  std::thread m_thread;
  const int m_timerfd;
  eventloop* p_loop;
  channel* p_timerfdchannel;

  //timer list sorted by expiration
  timerlist m_timers;
  activetimerset m_activetimers;

  bool m_callingexpiredtimers; /*atomic*/
  activetimerset m_cancelingtimers;

  std::condition_variable m_wait_loop_init;
};

#endif

cpp

#include <stdint.h>
#include <assert.h>
#include <sys/timerfd.h>
#include <unistd.h>
#include <mutex>

#include "logger.hpp"
#include "channel.hpp"
#include "eventloop.hpp"
#include "timerqueue.hpp"

namespace timerfd
{

int createtimerfd()
{
  int timerfd = ::timerfd_create(clock_monotonic,
                                 tfd_nonblock | tfd_cloexec);
  log_trace << "createtimerfd() fd : " << timerfd;
  if (timerfd < 0)
  {
    log_sysfatal << "failed in timerfd_create";
  }
  return timerfd;
}

struct timespec howmuchtimefromnow(timestamp when)
{
  int64_t microseconds = when.microsecondssinceepoch()
                         - timestamp::now().microsecondssinceepoch();
  if (microseconds < 100)
  {
    microseconds = 100;
  }
  struct timespec ts;
  ts.tv_sec = static_cast<time_t>(
      microseconds / timestamp::kmicrosecondspersecond);
  ts.tv_nsec = static_cast<long>(
      (microseconds % timestamp::kmicrosecondspersecond) * 1000);
  return ts;
}

void readtimerfd(int timerfd, timestamp now)
{
  uint64_t howmany;
  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
  log_trace << "timerqueue::handleread() " << howmany << " at " << now.tostring();
  if (n != sizeof howmany)
  {
    log_error << "timerqueue::handleread() reads " << n << " bytes instead of 8";
  }
}

void resettimerfd(int timerfd, timestamp expiration)
{
  // wake up loop by timerfd_settime()
  log_trace << "resettimerfd()";
  struct itimerspec newvalue;
  struct itimerspec oldvalue;
  bzero(&newvalue, sizeof newvalue);
  bzero(&oldvalue, sizeof oldvalue);
  newvalue.it_value = howmuchtimefromnow(expiration);
  int ret = ::timerfd_settime(timerfd, 0, &newvalue, &oldvalue);
  if (ret)
  {
    log_syserr << "timerfd_settime()";
  }
}

};

using namespace timerfd;

atomicint64  timer::s_numcreated;

void timer::restart(timestamp now)
{
  if(m_repeat)
  {
    m_expiration = timestamp::addtime(now, m_interval);
  }
  else
  {
    m_expiration = timestamp::invalid();
  }

}

timerqueue::timerqueue()
  :m_timerfd(createtimerfd()),
   m_callingexpiredtimers(false)
{

}

timerqueue::~timerqueue()
{
  p_timerfdchannel->disableall();
  p_timerfdchannel->remove();
  p_loop->quit();
  m_thread.join();
  delete p_loop;
  delete p_timerfdchannel;
  ::close(m_timerfd);

  for (timerlist::iterator it = m_timers.begin();
      it != m_timers.end(); ++it)
  {
    delete it->second;
  }
}

void timerqueue::start()
{

  bool b_inited = false;;

  m_thread = std::thread([this, &b_inited]()mutable {
    this->p_loop = new eventloop();
    this->p_timerfdchannel = new channel(this->p_loop, this->m_timerfd);
    this->p_timerfdchannel->setreadcallback(std::bind(&timerqueue::handleread, this));
    this->p_timerfdchannel->enablereading();
    b_inited = true;
    this->m_wait_loop_init.notify_all();
    this->p_loop->loop();
  });

  std::mutex mutex;
  std::unique_lock<std::mutex> lock(mutex);
  while(!b_inited){
    m_wait_loop_init.wait(lock);
  }
}

std::vector<timerqueue::entry> timerqueue::getexpired(timestamp now)
{
  std::vector<entry> expired;
  entry sentry = std::make_pair(now, reinterpret_cast<timer*>uintptr_max);
  timerlist::iterator it = m_timers.lower_bound(sentry);
  assert(it == m_timers.end() || now < it->first);
  std::copy(m_timers.begin(), it, back_inserter(expired));
  m_timers.erase(m_timers.begin(), it);

  for(std::vector<entry>::iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    activetimer timer(it->second, it->second->sequence());
    size_t n = m_activetimers.erase(timer);
    assert(n == 1); (void)n;
  }

  assert(m_timers.size() == m_activetimers.size());

  return expired;
}


timerid timerqueue::addtimer(const timercallback_t& cb, timestamp when, double interval)
{
  timer* timer = new timer(cb, when, interval);
  p_loop->runinloop(std::bind(&timerqueue::addtimerinloop, this, timer));
  return timerid(timer, timer->sequence());
}

void timerqueue::addtimerinloop(timer* timer)
{
  p_loop->assertinloopthread();
  bool earliestchanged = insert(timer);

  if (earliestchanged)
  {
    resettimerfd(m_timerfd, timer->expiration());
  }
}

void timerqueue::cancel(timerid timerid)
{
  p_loop->runinloop(std::bind(&timerqueue::cancelinloop, this, timerid));
}

void timerqueue::cancelinloop(timerid timerid)
{
  p_loop->assertinloopthread();
  assert(m_timers.size() ==  m_activetimers.size());
  activetimer timer(timerid.m_timer, timerid.m_sequence);
  activetimerset::iterator it = m_activetimers.find(timer);
  if(it != m_activetimers.end())
  {
    size_t n = m_timers.erase(entry(it->first->expiration(), it->first));
    assert(n == 1);
    delete it->first;
  }
  else if (m_callingexpiredtimers)
  {
    m_cancelingtimers.insert(timer);
  }
  assert(m_timers.size() == m_activetimers.size());
}

bool timerqueue::insert(timer* timer)
{
  p_loop->assertinloopthread();
  assert(m_timers.size() == m_activetimers.size());
  bool earliestchanged = false;
  timestamp when = timer->expiration();
  timerlist::iterator it = m_timers.begin();
  if (it == m_timers.end() || when < it->first)
  {
    earliestchanged = true;
  }
  {
    std::pair<timerlist::iterator, bool> result
      = m_timers.insert(entry(when, timer));
    assert(result.second); (void)result;
  }
  {
    std::pair<activetimerset::iterator, bool> result
      = m_activetimers.insert(activetimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  }

  log_trace << "timerqueue::insert() " << "m_timers.size() : "
  << m_timers.size() << " m_activetimers.size() : " << m_activetimers.size();

  assert(m_timers.size() == m_activetimers.size());
  return earliestchanged;
}


void timerqueue::handleread()
{
  p_loop->assertinloopthread();
  timestamp now(timestamp::now());
  readtimerfd(m_timerfd, now);

  std::vector<entry> expired = getexpired(now);

  log_trace << "expired timer size " << expired.size() << "  ";

  m_callingexpiredtimers = true;
  m_cancelingtimers.clear();

  for(std::vector<entry>::iterator it = expired.begin();
      it != expired.end(); ++it )
  {
    it->second->run();
  }

  m_callingexpiredtimers = false;

  reset(expired, now);
}


void timerqueue::reset(const std::vector<entry>& expired, timestamp now)
{
  timestamp nextexpire;

  for(std::vector<entry>::const_iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    activetimer timer(it->second, it->second->sequence());
    if(it->second->repeat()
      && m_cancelingtimers.find(timer) == m_cancelingtimers.end())
    {//如果是周期定时器则重新设定时间插入. 否则delete.
      it->second->restart(now);
      insert(it->second);
    }
    else
    {// fixme move to a free list no delete please
      delete it->second;
    }
  }

  if (!m_timers.empty())
  {
    nextexpire = m_timers.begin()->second->expiration();
  }

  if (nextexpire.valid())
  {
    resettimerfd(m_timerfd, nextexpire);
  }
}


timerid timerqueue::runat(const timestamp& time, const timercallback_t& cb)
{
  return addtimer(cb, time, 0.0);
}

timerid timerqueue::runafter(double delay, const timercallback_t& cb)
{
  timestamp time(timestamp::addtime(timestamp::now(), delay));
  return runat(time, cb);
}

timerid timerqueue::runevery(double interval, const timercallback_t& cb)
{
  timestamp time(timestamp::addtime(timestamp::now(), interval));
  return addtimer(cb, time, interval);
}