欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

ceph源码分析--monitortick详情

程序员文章站 2022-07-01 18:31:48
刚入职的时候曾经定位过一个ceph集群时钟回调8小时的单子。后来投入到了项目中。 正式入职答辩的时候,主管问,那这个tick线程是定时调用的吗?当时被问懵了,因为自己并没有深入的前后看到这块...

刚入职的时候曾经定位过一个ceph集群时钟回调8小时的单子。后来投入到了项目中。
正式入职答辩的时候,主管问,那这个tick线程是定时调用的吗?当时被问懵了,因为自己并没有深入的前后看到这块。事情总要有头有尾,遗留的问题总要解决,写这篇博客的目的就是回答当时的遗留问题。究竟monitor的这个tick是怎么跑的?

Monitor的tick线程启动
同时调用PaxosService的tick函数

int Monitor::init()

int Monitor::init()
{
  dout(2) << "init" << dendl;
  Mutex::Locker l(lock);

  finisher.start();

  // start ticker
  timer.init();
  new_tick();
  ···
}

void Monitor::new_tick()

void Monitor::new_tick()
{
  timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
    tick();
      }));
}

时间间隔为5s

OPTION(mon_tick_interval, OPT_INT)

void Monitor::tick()

void Monitor::tick()
{
  // ok go.
  dout(11) << "tick" << dendl;
  const utime_t now = ceph_clock_now();

  // Check if we need to emit any delayed health check updated messages
  if (is_leader()) {
    const auto min_period = g_conf->get_val(
                              "mon_health_log_update_period");
    for (auto& svc : paxos_service) {
      auto health = svc->get_health_checks();

      for (const auto &i : health.checks) {
        const std::string &code = i.first;
        const std::string &summary = i.second.summary;
        const health_status_t severity = i.second.severity;

        auto status_iter = health_check_log_times.find(code);
        if (status_iter == health_check_log_times.end()) {
          continue;
        }

        auto &log_status = status_iter->second;
        bool const changed = log_status.last_message != summary
                             || log_status.severity != severity;

        if (changed && now - log_status.updated_at > min_period) {
          log_status.last_message = summary;
          log_status.updated_at = now;
          log_status.severity = severity;

          ostringstream ss;
          ss << "Health check update: " << summary << " (" << code << ")";
          clog->health(severity) << ss.str();
        }
      }
    }
  }


  for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
    (*p)->tick();
    (*p)->maybe_trim();
  }

  // trim sessions
  {
    Mutex::Locker l(session_map_lock);
    auto p = session_map.sessions.begin();

    bool out_for_too_long = (!exited_quorum.is_zero() &&
                 now > (exited_quorum + 2*g_conf->mon_lease));

    while (!p.end()) {
      MonSession *s = *p;
      ++p;

      // don't trim monitors
      if (s->inst.name.is_mon())
    continue;

      if (s->session_timeout < now && s->con) {
    // check keepalive, too
    s->session_timeout = s->con->get_last_keepalive();
    s->session_timeout += g_conf->mon_session_timeout;
      }
      if (s->session_timeout < now) {
    dout(10) << " trimming session " << s->con << " " << s->inst
         << " (timeout " << s->session_timeout
         << " < now " << now << ")" << dendl;
      } else if (out_for_too_long) {
    // boot the client Session because we've taken too long getting back in
    dout(10) << " trimming session " << s->con << " " << s->inst
         << " because we've been out of quorum too long" << dendl;
      } else {
    continue;
      }

      s->con->mark_down();
      remove_session(s);
      logger->inc(l_mon_session_trim);
    }
  }
  sync_trim_providers();

  if (!maybe_wait_for_quorum.empty()) {
    finish_contexts(g_ceph_context, maybe_wait_for_quorum);
  }

  if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
    // this is only necessary on upgraded clusters.
    MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
    prepare_new_fingerprint(t);
    paxos->trigger_propose();
  }
  //new_tick()
  new_tick();
}*>

后续还有各个PaxosService的tick,还在建设中。