Ceph网络通信机制与源码分析
作为一个分布式存储系统,Ceph自然需要一个稳定的网络通信模块,用于客户端和服务端,以及各个节点之间的消息通信。Ceph的网络模块位于源代码的ceph/src/msg 下,该模块构造了网络通信的基本框架。在文件夹下还包含了三种接口的实现:simple、async、xio。由于simple比较简单,也是目前生产环境中可以使用的,所以就只介绍它。
simple对于每一个连接,都会创建两个线程,其中一个用于监听和读取该终端的读事件,另一个用于写事件。读线程得到请求以后会解析网络流并开始构建消息,然后派发到后面的 Dispatcher。写线程在大部分时候会处于 Sleep 状态,直到有新的消息需要发送才会被唤醒。
Messenger是网络模块的核心数据结构,负责接收/发送消息。OSD主要有两个Messenger:ms_public处于与客户端的消息,ms_cluster处理与其它OSD的消息。
基础类介绍 Message该类是消息的基类,所有要发送的消息都是继承自该类的,它由消息头(header)、数据(user_data)、结束标记(footer)构成。
打开 ceph/src/msg/Message.h
class Message : public RefCountedObject { protected: ceph_msg_header header; // headerelope ceph_msg_footer footer; bufferlist payload; // "front" unaligned blob bufferlist middle; // "middle" unaligned blob bufferlist data; // data payload (page-alignment will be preserved where possible) ... ... ...
类中封装了ceph_msg_header 、ceph_msg_footer 。他们就是信息头和信息结束标志的结构体。数据则是由三部分组成的,分别是playload(一般保存相关元数据)、middle(留用)、data(读写数据)。
ceph_msg_header 主要是封装数据相关信息
class SimpleMessenger : public SimplePolicyMessenger { //是Messager接口的实现 public: Accepter accepter;//用来监听请求 DispatchQueue dispatch_queue;//请求的队列 friend class Accepter; //用于创建一个Pipe Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con, Message *first); /** * Queue up a Message for delivery to the entity specified * by addr and dest_type. * submit_message() is responsible for creating * new Pipes (and closing old ones) as necessary. * 提交发送消息到发送队列,在必要时创建Pipe */ void submit_message(Message *m, PipeConnection *con, const entity_addr_t& addr, int dest_type, bool already_locked); friend class Pipe; //在已存在的Pipe中查找 Pipe *_lookup_pipe(const entity_addr_t& k) { ceph::unordered_map::iterator p = rank_pipe.find(k); if (p == rank_pipe.end()) return NULL; // see lock cribbing in Pipe::fault() if (p->second->state_closed) return NULL; return p->second; } } ;Connection
看类的名字就知道他是干嘛的了,是用来发送消息、接收消息的的。
struct Connection : public RefCountedObject { mutable Mutex lock; Messenger *msgr; RefCountedObject *priv; int peer_type; entity_addr_t peer_addr; utime_t last_keepalive, last_keepalive_ack; .... .... .... /** * 判断是否能够发送消息了。。。。 * @return true if ready to send, or false otherwise */ virtual bool is_connected() = 0; ... ... /** * @param m The Message to send. The Messenger consumes a single reference * when you pass it in. * 主要功能就是这个 !!!用来发送消息的!!!! * @return 0 on success, or -errno on failure. */ virtual int send_message(Message *m) = 0; int send_message(boost::intrusive_ptr m) { return send_message(m.detach()); /* send_message(Message *m) consumes a reference */ } ... ... ... };Pipe
Pipe实现的就是开头提到的,对于每一个连接,都会在内部创建一个读线程、一个写线程用来处理接收消息和发送消息。它的层次位于Connetion和Dispatcher的中间,其中拥有读写线程pipe:: reader_thread和pipe::writer_thread,他们的入口函数分别为Pipe::reader和Pipe::writer函数。可以看到,其实具体的封装socket是在这个部分的吧。。。
class Pipe : public RefCountedObject { /** * The Reader thread handles all reads off the socket * 读线程,用于接收消息 */ class Reader : public Thread { Pipe *pipe; public: explicit Reader(Pipe *p) : pipe(p) {} void *entry() override { pipe->reader(); return 0; } } reader_thread; /** * The Writer thread handles all writes to the socket (after startup). * 写线程用于发送消息 */ class Writer : public Thread { Pipe *pipe; public: explicit Writer(Pipe *p) : pipe(p) {} void *entry() override { pipe->writer(); return 0; } } writer_thread; .... .... .... map > out_q; // priority queue 准备发送的消息队列 DispatchQueue *in_q;//接收到消息的队列 list sent;//当前要发送的消息 Cond cond; bool send_keepalive; bool send_keepalive_ack; utime_t keepalive_ack_stamp; bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it .... __u32 connect_seq, peer_global_seq; uint64_t out_seq;//发送序号 uint64_t in_seq, in_seq_acked;//接收序号、ACK信号 .... void set_socket_options(); .... int read_message(Message **pm, AuthSessionHandler *session_security_copy); int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body); void start_reader(); void start_writer(); void shutdown_socket() { recv_reset(); if (sd >= 0) ::shutdown(sd, SHUT_RDWR); } ... ... ... };Dispatcher
这个类用于消息的分发,在Pipe中接收、发送队列中有很多请求,他就是负责把Message的请求分发给具体的应用层。
lass Dispatcher { public: explicit Dispatcher(CephContext *cct_) : cct(cct_) { } virtual ~Dispatcher() { } /** * The Messenger calls this function to query if you are capable * of "fast dispatch"ing a message. Indicating that you can fast * dispatch it requires that you: * 1) Handle the Message quickly and without taking long-term contended * locks. (This function is likely to be called in-line with message * receipt.) * 2) Be able to accept the Message even if you have not yet received * an ms_handle_accept() notification for the Connection it is associated * with, and even if you *have* called mark_down() or received an * ms_handle_reset() (or similar) call on the Connection. You will * not receive more than one dead "message" (and should generally be * prepared for that circumstance anyway, since the normal dispatch can begin, * then trigger Connection failure before it's percolated through your system). * We provide ms_handle_fast_[connect|accept] calls if you need them, under * similar speed and state constraints as fast_dispatch itself. * 3) Be able to make a determination on fast_dispatch without relying * on particular system state -- the ms_can_fast_dispatch() call might * be called multiple times on a single message; the state might change between * calling ms_can_fast_dispatch and ms_fast_dispatch; etc. * * @param m The message we want to fast dispatch. * @returns True if the message can be fast dispatched; false otherwise. */ /** * Perform a "fast dispatch" on a given message. See * ms_can_fast_dispatch() for the requirements. * * @param m The Message to fast dispatch. */ virtual void ms_fast_dispatch(Message *m) { ceph_abort(); } /** * The Messenger calls this function to deliver a single message. * * @param m The message being delivered. You (the Dispatcher) * are given a single reference count on it. */ virtual bool ms_dispatch(Message *m) = 0; /** * This function will be called whenever a Connection is newly-created * or reconnects in the Messenger. * * @param con The new Connection which has been established. You are not * granted a reference to it -- take one if you need one! */ virtual void ms_handle_connect(Connection *con) {} /** * This function will be called synchronously whenever a Connection is * newly-created or reconnects in the Messenger, if you support fast * dispatch. It is guaranteed to be called before any messages are * dispatched. * * @param con The new Connection which has been established. You are not * granted a reference to it -- take one if you need one! */ virtual void ms_handle_fast_connect(Connection *con) {} /** * Callback indicating we have accepted an incoming connection. * * @param con The (new or existing) Connection associated with the session */ virtual void ms_handle_accept(Connection *con) {} ... ... ... };流程分析 消息的发送:
图片来自 Linux/2015-10/124549.htm" target="_blank">http://www.linuxidc.com/Linux/2015-10/124549.htm
SimpleMessager 首先获取对应的Connection
首先查找已有的Pipe,若没有则创建一个Pipe
获得connection后,调用发送函数 _send_message
_send_message 最终调用submit_message函数,它会查看Pipe的状态,若空则创建一个Pipe,若不空且状态不是关闭状态,那就把调用_send 把消息发送到out_q发送队列中,触发发送线程。
触发发送线程后 ,每个的Pipe负责使用Write_thread 来发送out_q的消息,入口函数为Pipi::write。
进入发送线程后,使用_get_next_outgoing来获取out_q中的一个消息,使用write_message来把消息发送出去
消息的接收使用入口函数 Pipe::reader 启动接收线程
1) 判断状态后,使用tcp_read 获取tag
2)根据tag的类型获取不同类型的消息。
3) 调用 read_message 来接收消息,函数返回后,消息接收完成。
判断能不能使用fast_dispatch ,能则不将消息加入mqueue (DispatchQueue中)直接由Pipe调用ms_fast_dispatch函数处理,否则加入该队列,让DispatchQueue调用ms_dispatch 处理。
上一篇: 8.C#知识点:委托和事件
推荐阅读
-
spring源码分析系列5:ApplicationContext的初始化与Bean生命周期
-
spring源码分析6: ApplicationContext的初始化与BeanDefinition的搜集入库
-
Qt事件分发机制源码分析之QApplication对象构建过程
-
Netty源码分析之ChannelPipeline(二)—ChannelHandler的添加与删除
-
[Abp vNext 源码分析] - 7. 权限与验证
-
Python专用方法与迭代机制实例分析
-
[五]类加载机制双亲委派机制 底层代码实现原理 源码分析 java类加载双亲委派机制是如何实现的
-
源码分析--ConcurrentHashMap与HashTable(JDK1.8)
-
jQuery源码分析之jQuery.fn.each与jQuery.each用法
-
PHP中变量引用与变量销毁机制分析