欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

Ceph网络通信机制与源码分析

程序员文章站 2022-06-27 22:49:50
Ceph网络通信机制与源码分析 作为一个分布式存储系统,Ceph自然需要一个稳定的网络通信模块,用于客户端和服务端,以及各个节点之间的消息通信。Ceph的网络模块位于源代码的ceph/src...
Ceph网络通信机制与源码分析

作为一个分布式存储系统,Ceph自然需要一个稳定的网络通信模块,用于客户端和服务端,以及各个节点之间的消息通信。Ceph的网络模块位于源代码的ceph/src/msg 下,该模块构造了网络通信的基本框架。在文件夹下还包含了三种接口的实现:simple、async、xio。由于simple比较简单,也是目前生产环境中可以使用的,所以就只介绍它。

simple对于每一个连接,都会创建两个线程,其中一个用于监听和读取该终端的读事件,另一个用于写事件。读线程得到请求以后会解析网络流并开始构建消息,然后派发到后面的 Dispatcher。写线程在大部分时候会处于 Sleep 状态,直到有新的消息需要发送才会被唤醒。

Messenger是网络模块的核心数据结构,负责接收/发送消息。OSD主要有两个Messenger:ms_public处于与客户端的消息,ms_cluster处理与其它OSD的消息。

Ceph网络通信机制与源码分析

基础类介绍 Message

该类是消息的基类,所有要发送的消息都是继承自该类的,它由消息头(header)、数据(user_data)、结束标记(footer)构成。

打开 ceph/src/msg/Message.h

class Message : public RefCountedObject {
protected:
  ceph_msg_header  header;      // headerelope
  ceph_msg_footer  footer;
  bufferlist       payload;  // "front" unaligned blob
  bufferlist       middle;   // "middle" unaligned blob
  bufferlist       data;     // data payload (page-alignment will be preserved where possible)

  ...
  ...
  ...

类中封装了ceph_msg_header 、ceph_msg_footer 。他们就是信息头和信息结束标志的结构体。数据则是由三部分组成的,分别是playload(一般保存相关元数据)、middle(留用)、data(读写数据)。

ceph_msg_header 主要是封装数据相关信息

class SimpleMessenger : public SimplePolicyMessenger {
   //是Messager接口的实现


public:
  Accepter accepter;//用来监听请求
  DispatchQueue dispatch_queue;//请求的队列

  friend class Accepter;

  //用于创建一个Pipe
  Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con,
             Message *first);

  /**
   * Queue up a Message for delivery to the entity specified
   * by addr and dest_type.
   * submit_message() is responsible for creating
   * new Pipes (and closing old ones) as necessary.
   * 提交发送消息到发送队列,在必要时创建Pipe
   */
  void submit_message(Message *m, PipeConnection *con,
              const entity_addr_t& addr, int dest_type,
              bool already_locked);

  friend class Pipe;

 //在已存在的Pipe中查找
  Pipe *_lookup_pipe(const entity_addr_t& k) {
    ceph::unordered_map::iterator p = rank_pipe.find(k);
    if (p == rank_pipe.end())
      return NULL;
    // see lock cribbing in Pipe::fault()
    if (p->second->state_closed)
      return NULL;
    return p->second;
  }

} ;
Connection

看类的名字就知道他是干嘛的了,是用来发送消息、接收消息的的。

struct Connection : public RefCountedObject {
  mutable Mutex lock;
  Messenger *msgr;
  RefCountedObject *priv;
  int peer_type;
  entity_addr_t peer_addr;
  utime_t last_keepalive, last_keepalive_ack;
  ....
  ....
  ....
  /**
   * 判断是否能够发送消息了。。。。
   * @return true if ready to send, or false otherwise
   */
  virtual bool is_connected() = 0;
  ...
  ...
  /**
   * @param m The Message to send. The Messenger consumes a single reference
   * when you pass it in.
   * 主要功能就是这个 !!!用来发送消息的!!!!
   * @return 0 on success, or -errno on failure.
   */
  virtual int send_message(Message *m) = 0;

  int send_message(boost::intrusive_ptr m)
  {
    return send_message(m.detach()); /* send_message(Message *m) consumes a reference */
  }
  ...
  ...
  ...

};
Pipe

Pipe实现的就是开头提到的,对于每一个连接,都会在内部创建一个读线程、一个写线程用来处理接收消息和发送消息。它的层次位于Connetion和Dispatcher的中间,其中拥有读写线程pipe:: reader_thread和pipe::writer_thread,他们的入口函数分别为Pipe::reader和Pipe::writer函数。可以看到,其实具体的封装socket是在这个部分的吧。。。

class Pipe : public RefCountedObject {
    /**
     * The Reader thread handles all reads off the socket 
     * 读线程,用于接收消息
     */ 
    class Reader : public Thread {
      Pipe *pipe;
    public:
      explicit Reader(Pipe *p) : pipe(p) {}
      void *entry() override { pipe->reader(); return 0; }
    } reader_thread;

    /**
     * The Writer thread handles all writes to the socket (after startup).
     * 写线程用于发送消息
     */
    class Writer : public Thread {
      Pipe *pipe;
    public:
      explicit Writer(Pipe *p) : pipe(p) {}
      void *entry() override { pipe->writer(); return 0; }
    } writer_thread;
    ....
    ....
    ....
    map > out_q;  // priority queue 准备发送的消息队列
    DispatchQueue *in_q;//接收到消息的队列
    list sent;//当前要发送的消息
    Cond cond;
    bool send_keepalive;
    bool send_keepalive_ack;
    utime_t keepalive_ack_stamp;
    bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
    ....
    __u32 connect_seq, peer_global_seq;
    uint64_t out_seq;//发送序号
    uint64_t in_seq, in_seq_acked;//接收序号、ACK信号
    ....
    void set_socket_options();
    ....
    int read_message(Message **pm,
             AuthSessionHandler *session_security_copy);
    int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body);
    void start_reader();
    void start_writer();
    void shutdown_socket() {
      recv_reset();
      if (sd >= 0)
        ::shutdown(sd, SHUT_RDWR);
    }
    ...
    ...
    ...

  };
Dispatcher

这个类用于消息的分发,在Pipe中接收、发送队列中有很多请求,他就是负责把Message的请求分发给具体的应用层。

lass Dispatcher {
public:
  explicit Dispatcher(CephContext *cct_)
    : cct(cct_)
  {
  }
  virtual ~Dispatcher() { }

  /**
   * The Messenger calls this function to query if you are capable
   * of "fast dispatch"ing a message. Indicating that you can fast
   * dispatch it requires that you:
   * 1) Handle the Message quickly and without taking long-term contended
   * locks. (This function is likely to be called in-line with message
   * receipt.)
   * 2) Be able to accept the Message even if you have not yet received
   * an ms_handle_accept() notification for the Connection it is associated
   * with, and even if you *have* called mark_down() or received an
   * ms_handle_reset() (or similar) call on the Connection. You will
   * not receive more than one dead "message" (and should generally be
   * prepared for that circumstance anyway, since the normal dispatch can begin,
   * then trigger Connection failure before it's percolated through your system).
   * We provide ms_handle_fast_[connect|accept] calls if you need them, under
   * similar speed and state constraints as fast_dispatch itself.
   * 3) Be able to make a determination on fast_dispatch without relying
   * on particular system state -- the ms_can_fast_dispatch() call might
   * be called multiple times on a single message; the state might change between
   * calling ms_can_fast_dispatch and ms_fast_dispatch; etc.
   *
   * @param m The message we want to fast dispatch.
   * @returns True if the message can be fast dispatched; false otherwise.
   */

  /**
   * Perform a "fast dispatch" on a given message. See
   * ms_can_fast_dispatch() for the requirements.
   *
   * @param m The Message to fast dispatch.
   */
  virtual void ms_fast_dispatch(Message *m) { ceph_abort(); }

  /**
   * The Messenger calls this function to deliver a single message.
   *
   * @param m The message being delivered. You (the Dispatcher)
   * are given a single reference count on it.
   */
  virtual bool ms_dispatch(Message *m) = 0;

  /**
   * This function will be called whenever a Connection is newly-created
   * or reconnects in the Messenger.
   *
   * @param con The new Connection which has been established. You are not
   * granted a reference to it -- take one if you need one!
   */
  virtual void ms_handle_connect(Connection *con) {}

  /**
   * This function will be called synchronously whenever a Connection is
   * newly-created or reconnects in the Messenger, if you support fast
   * dispatch. It is guaranteed to be called before any messages are
   * dispatched.
   *
   * @param con The new Connection which has been established. You are not
   * granted a reference to it -- take one if you need one!
   */
  virtual void ms_handle_fast_connect(Connection *con) {}

  /**
   * Callback indicating we have accepted an incoming connection.
   *
   * @param con The (new or existing) Connection associated with the session
   */
  virtual void ms_handle_accept(Connection *con) {}
  ...
  ...
  ...

};
流程分析 消息的发送:

Ceph网络通信机制与源码分析

图片来自 Linux/2015-10/124549.htm" target="_blank">http://www.linuxidc.com/Linux/2015-10/124549.htm

SimpleMessager 首先获取对应的Connection

首先查找已有的Pipe,若没有则创建一个Pipe

获得connection后,调用发送函数 _send_message

_send_message 最终调用submit_message函数,它会查看Pipe的状态,若空则创建一个Pipe,若不空且状态不是关闭状态,那就把调用_send 把消息发送到out_q发送队列中,触发发送线程。

触发发送线程后 ,每个的Pipe负责使用Write_thread 来发送out_q的消息,入口函数为Pipi::write。

进入发送线程后,使用_get_next_outgoing来获取out_q中的一个消息,使用write_message来把消息发送出去

消息的接收

Ceph网络通信机制与源码分析

使用入口函数 Pipe::reader 启动接收线程

1) 判断状态后,使用tcp_read 获取tag

2)根据tag的类型获取不同类型的消息。

3) 调用 read_message 来接收消息,函数返回后,消息接收完成。

判断能不能使用fast_dispatch ,能则不将消息加入mqueue (DispatchQueue中)直接由Pipe调用ms_fast_dispatch函数处理,否则加入该队列,让DispatchQueue调用ms_dispatch 处理。