源码阅读之storm操作zookeeper-cluster.clj
storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。 backtype.storm.cluster定义了两个重要protocol:clusterstate和stormclusterstate。
clojure中的protocol可以看成java中的接口,封装了一组方法。clusterstate协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,clusterstate协议定义如下:
clusterstate协议
(defprotocol clusterstate (set-ephemeral-node [this path data]) (delete-node [this path]) (create-sequential [this path data]) ;; if node does not exist, create persistent with this data (set-data [this path data]) (get-data [this path watch?]) (get-version [this path watch?]) (get-data-with-version [this path watch?]) (get-children [this path watch?]) (mkdirs [this path]) (close [this]) (register [this callback]) (unregister [this id]))
stormclusterstate协议封装了一组storm与zookeeper进行交互的函数,可以将stormclusterstate协议中的函数看成clusterstate协议中函数的"组合"。stormclusterstate协议定义如下:
stormclusterstate协议
(defprotocol stormclusterstate (assignments [this callback]) (assignment-info [this storm-id callback]) (assignment-info-with-version [this storm-id callback]) (assignment-version [this storm-id callback]) (active-storms [this]) (storm-base [this storm-id callback]) (get-worker-heartbeat [this storm-id node port]) (executor-beats [this storm-id executor->node+port]) (supervisors [this callback]) (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist (setup-heartbeats! [this storm-id]) (teardown-heartbeats! [this storm-id]) (teardown-topology-errors! [this storm-id]) (heartbeat-storms [this]) (error-topologies [this]) (worker-heartbeat! [this storm-id node port info]) (remove-worker-heartbeat! [this storm-id node port]) (supervisor-heartbeat! [this supervisor-id info]) (activate-storm! [this storm-id storm-base]) (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) (set-assignment! [this storm-id info]) (remove-storm! [this storm-id]) (report-error [this storm-id task-id node port error]) (errors [this storm-id task-id]) (disconnect [this]))
命名空间backtype.storm.cluster除了定义clusterstate和stormclusterstate这两个重要协议外,还定义了两个重要函数:mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函数如下:
该函数返回一个实现了clusterstate协议的对象,通过这个对象就可以与zookeeper进行交互了。
mk-distributed-cluster-state函数
(defn mk-distributed-cluster-state ;; conf绑定了storm.yaml中的配置信息,是一个map对象 [conf] ;; zk绑定一个zk client,storm使用curatorframework与zookeeper进行交互 (let [zk (zk/mk-client conf (conf storm-zookeeper-servers) (conf storm-zookeeper-port) :auth-conf conf)] ;; 创建storm集群在zookeeper上的根目录,默认值为/storm (zk/mkdirs zk (conf storm-zookeeper-root)) (.close zk)) ;; callbacks绑定回调函数集合,是一个map对象 (let [callbacks (atom {}) ;; active标示zookeeper集群状态 active (atom true) ;; zk重新绑定新的zk client,该zk client设置了watcher,这样当zookeeper集群的状态发生变化时,zk server会给zk client发送相应的event,zk client设置的watcher会调用callbacks中相应回调函数来处理event ;; 启动nimbus时,callbacks是一个空集合,所以nimbus端收到event后不会调用任何回调函数;但是启动supervisor时,callbacks中注册了回调函数,所以当supervisor收到zk server发送的event后,会调用相应的回调函数 ;; mk-client函数定义在zookeeper.clj文件中,请参见其定义部分 zk (zk/mk-client conf (conf storm-zookeeper-servers) (conf storm-zookeeper-port) :auth-conf conf :root (conf storm-zookeeper-root) ;; :watcher绑定一个函数,指定zk client的默认watcher函数,state标示当前zk client的状态;type标示事件类型;path标示zookeeper上产生该事件的znode ;; 该watcher函数主要功能就是执行callbacks集合中的函数,callbacks集合中的函数是在mk-storm-cluster-state函数中通过调用clusterstate的register函数添加的 :watcher (fn [state type path] (when @active (when-not (= :connected state) (log-warn "received event " state ":" type ":" path " with disconnected zookeeper.")) (when-not (= :none type) (doseq [callback (vals @callbacks)] (callback type path))))))] ;; reify相当于java中的implements,这里表示实现一个协议 (reify clusterstate ;; register函数用于将回调函数加入callbacks中,key是一个32位的标识 (register [this callback] (let [id (uuid)] (swap! callbacks assoc id callback) id)) ;; unregister函数用于将指定key的回调函数从callbacks中删除 (unregister [this id] (swap! callbacks dissoc id)) ;; 在zookeeper上添加一个临时节点 (set-ephemeral-node [this path data] (zk/mkdirs zk (parent-path path)) (if (zk/exists zk path false) (try-cause (zk/set-data zk path data) ; should verify that it's ephemeral (catch keeperexception$nonodeexception e (log-warn-error e "ephemeral node disappeared between checking for existing and setting data") (zk/create-node zk path data :ephemeral) )) (zk/create-node zk path data :ephemeral))) ;; 在zookeeper上添加一个顺序节点 (create-sequential [this path data] (zk/create-node zk path data :sequential)) ;; 修改某个节点数据 (set-data [this path data] ;; note: this does not turn off any existing watches (if (zk/exists zk path false) (zk/set-data zk path data) (do (zk/mkdirs zk (parent-path path)) (zk/create-node zk path data :persistent)))) ;; 删除指定节点 (delete-node [this path] (zk/delete-recursive zk path)) ;; 获取指定节点数据。path标示节点路径;watch?是一个布尔类型值,表示是否需要对该节点进行"观察",如果watch?=true,当调用set-data函数修改该节点数据后, ;; 会给zk client发送一个事件,zk client接收事件后,会调用创建zk client时指定的默认watcher函数(即:watcher绑定的函数) (get-data [this path watch?] (zk/get-data zk path watch?)) ;; 与get-data函数的区别就是获取指定节点数据的同时,获取节点数据的version,version表示节点数据修改的次数 (get-data-with-version [this path watch?] (zk/get-data-with-version zk path watch?)) ;; 获取指定节点的version,watch?的含义与get-data函数中的watch?相同 (get-version [this path watch?] (zk/get-version zk path watch?)) ;; 获取指定节点的子节点列表,watch?的含义与get-data函数中的watch?相同 (get-children [this path watch?] (zk/get-children zk path watch?)) ;; 在zookeeper上创建一个节点 (mkdirs [this path] (zk/mkdirs zk path)) ;; 关闭zk client (close [this] (reset! active false) (.close zk)))))
mk-storm-cluster-state函数定义如下:
mk-storm-cluster-state函数非常重要,该函数返回一个实现了stormclusterstate协议的实例,通过该实例storm就可以更加方便与zookeeper进行交互。
在启动nimbus和supervisor的函数中均调用了mk-storm-cluster-state函数。关于nimbus和supervisor的启动将在之后的文章中介绍。
mk-storm-cluster-state函数
(defn mk-storm-cluster-state [cluster-state-spec] ;; satisfies?谓词相当于java中的instanceof,判断cluster-state-spec是不是clusterstate实例 (let [[solo? cluster-state] (if (satisfies? clusterstate cluster-state-spec) [false cluster-state-spec] [true (mk-distributed-cluster-state cluster-state-spec)]) ;; 绑定topology id->回调函数的map,当/assignments/{topology id}数据发生变化时,zk client执行assignment-info-callback中topology id所对应的回调函数 assignment-info-callback (atom {}) ;; assignment-info-with-version-callback与assignment-info-callback类似 assignment-info-with-version-callback (atom {}) ;; assignment-version-callback与assignments-callback类似 assignment-version-callback (atom {}) ;; 当/supervisors标示的znode的子节点发生变化时,zk client执行supervisors-callback指向的函数 supervisors-callback (atom nil) ;; 当/assignments标示的znode的子节点发生变化时,zk client执行assignments-callback指向的函数 assignments-callback (atom nil) ;; 当/storms/{topology id}标示的znode的数据发生变化时,zk client执行storm-base-callback中topology id所对应的回调函数 storm-base-callback (atom {}) ;; register函数将"回调函数(fn ...)"添加到cluster-state的callbacks集合中,并返回标示该回调函数的uuid state-id (register cluster-state ;; 定义"回调函数",type标示事件类型,path标示znode (fn [type path] ;; subtree绑定路径前缀如"assignments"、"storms"、"supervisors"等,args存放topology id (let [[subtree & args] (tokenize-path path)] ;; condp相当于java中的switch (condp = subtree ;; 当subtree="assignments"时,如果args为空,说明是/assignments的子节点发生变化,执行assignments-callback指向的回调函数,否则 ;; 说明/assignments/{topology id}标示的节点数据发生变化,执行assignment-info-callback指向的回调函数 assignments-root (if (empty? args) (issue-callback! assignments-callback) (issue-map-callback! assignment-info-callback (first args))) ;; 当subtree="supervisors"时,说明是/supervisors的子节点发生变化,执行supervisors-callback指向的回调函数 supervisors-root (issue-callback! supervisors-callback) ;; 当subtree="storms"时,说明是/storms/{topology id}标示的节点数据发生变化,执行storm-base-callback指向的回调函数 storms-root (issue-map-callback! storm-base-callback (first args)) ;; this should never happen (exit-process! 30 "unknown callback for subtree " subtree args)))))] ;; 在zookeeper上创建storm运行topology所必需的znode (doseq [p [assignments-subtree storms-subtree supervisors-subtree workerbeats-subtree errors-subtree]] (mkdirs cluster-state p)) ;; 返回一个实现stormclusterstate协议的实例 (reify stormclusterstate ;; 获取/assignments的子节点列表,如果callback不为空,将其赋值给assignments-callback,并对/assignments添加"节点观察" (assignments [this callback] (when callback (reset! assignments-callback callback)) (get-children cluster-state assignments-subtree (not-nil? callback))) ;; 获取/assignments/{storm-id}节点数据,即storm-id的分配信息,如果callback不为空,将其添加到assignment-info-callback中,并对/assignments/{storm-id}添加"数据观察" (assignment-info [this storm-id callback] (when callback (swap! assignment-info-callback assoc storm-id callback)) (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)))) ;; 获取/assignments/{storm-id}节点数据包括version信息,如果callback不为空,将其添加到assignment-info-with-version-callback中,并对/assignments/{storm-id}添加"数据观察" (assignment-info-with-version [this storm-id callback] (when callback (swap! assignment-info-with-version-callback assoc storm-id callback)) (let [{data :data version :version} (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))] {:data (maybe-deserialize data) :version version})) ;; 获取/assignments/{storm-id}节点数据的version信息,如果callback不为空,将其添加到assignment-version-callback中,并对/assignments/{storm-id}添加"数据观察" (assignment-version [this storm-id callback] (when callback (swap! assignment-version-callback assoc storm-id callback)) (get-version cluster-state (assignment-path storm-id) (not-nil? callback))) ;; 获取storm集群中正在运行的topology id即/storms的子节点列表 (active-storms [this] (get-children cluster-state storms-subtree false)) ;; 获取storm集群中所有有心跳的topology id即/workerbeats的子节点列表 (heartbeat-storms [this] (get-children cluster-state workerbeats-subtree false)) ;; 获取所有有错误的topology id即/errors的子节点列表 (error-topologies [this] (get-children cluster-state errors-subtree false)) ;; 获取指定storm-id进程的心跳信息,即/workerbeats/{storm-id}/{node-port}节点数据 (get-worker-heartbeat [this storm-id node port] (-> cluster-state (get-data (workerbeat-path storm-id node port) false) maybe-deserialize)) ;; 获取指定进程中所有线程的心跳信息 (executor-beats [this storm-id executor->node+port] ;; need to take executor->node+port in explicitly so that we don't run into a situation where a ;; long dead worker with a skewed clock overrides all the timestamps. by only checking heartbeats ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, ;; we avoid situations like that (let [node+port->executors (reverse-map executor->node+port) all-heartbeats (for [[[node port] executors] node+port->executors] (->> (get-worker-heartbeat this storm-id node port) (convert-executor-beats executors) ))] (apply merge all-heartbeats))) ;; 获取/supervisors的子节点列表,如果callback不为空,将其赋值给supervisors-callback,并对/supervisors添加"节点观察" (supervisors [this callback] (when callback (reset! supervisors-callback callback)) (get-children cluster-state supervisors-subtree (not-nil? callback))) ;; 获取/supervisors/{supervisor-id}节点数据,即supervisor的心跳信息 (supervisor-info [this supervisor-id] (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false))) ;; 设置进程心跳信息 (worker-heartbeat! [this storm-id node port info] (set-data cluster-state (workerbeat-path storm-id node port) (utils/serialize info))) ;; 删除进程心跳信息 (remove-worker-heartbeat! [this storm-id node port] (delete-node cluster-state (workerbeat-path storm-id node port))) ;; 创建指定storm-id的topology的用于存放心跳信息的节点 (setup-heartbeats! [this storm-id] (mkdirs cluster-state (workerbeat-storm-root storm-id))) ;; 删除指定storm-id的topology的心跳信息节点 (teardown-heartbeats! [this storm-id] (try-cause (delete-node cluster-state (workerbeat-storm-root storm-id)) (catch keeperexception e (log-warn-error e "could not teardown heartbeats for " storm-id)))) ;; 删除指定storm-id的topology的错误信息节点 (teardown-topology-errors! [this storm-id] (try-cause (delete-node cluster-state (error-storm-root storm-id)) (catch keeperexception e (log-warn-error e "could not teardown errors for " storm-id)))) ;; 创建临时节点存放supervisor的心跳信息 (supervisor-heartbeat! [this supervisor-id info] (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (utils/serialize info))) ;; 创建/storms/{storm-id}节点 (activate-storm! [this storm-id storm-base] (set-data cluster-state (storm-path storm-id) (utils/serialize storm-base))) ;; 更新topology对应的stormbase对象,即更新/storm/{storm-id}节点 (update-storm! [this storm-id new-elems] ;; base绑定storm-id在zookeeper上的stormbase对象 (let [base (storm-base this storm-id nil) ;; executors绑定component名称->组件并行度的map executors (:component->executors base) ;; new-elems绑定合并后的组件并行度map,update函数将组件新并行度map合并到旧map中 new-elems (update new-elems :component->executors (partial merge executors))] ;; 更新stormbase对象中的组件并行度map,并写入zookeeper的/storms/{storm-id}节点 (set-data cluster-state (storm-path storm-id) (-> base (merge new-elems) utils/serialize)))) ;; 获取storm-id的stormbase对象,即读取/storms/{storm-id}节点数据,如果callback不为空,将其赋值给storm-base-callback,并为/storms/{storm-id}节点添加"数据观察" (storm-base [this storm-id callback] (when callback (swap! storm-base-callback assoc storm-id callback)) (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback)))) ;; 删除storm-id的stormbase对象,即删除/storms/{storm-id}节点 (remove-storm-base! [this storm-id] (delete-node cluster-state (storm-path storm-id))) ;; 更新storm-id的分配信息,即更新/assignments/{storm-id}节点数据 (set-assignment! [this storm-id info] (set-data cluster-state (assignment-path storm-id) (utils/serialize info))) ;; 删除storm-id的分配信息,同时删除其stormbase信息,即删除/assignments/{storm-id}节点和/storms/{storm-id}节点 (remove-storm! [this storm-id] (delete-node cluster-state (assignment-path storm-id)) (remove-storm-base! this storm-id)) ;; 将组件异常信息写入zookeeper (report-error [this storm-id component-id node port error] ;; path绑定"/errors/{storm-id}/{component-id}" (let [path (error-path storm-id component-id) ;; data绑定异常信息,包括异常时间、异常堆栈信息、主机和端口 data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port} ;; 创建/errors/{storm-id}/{component-id}节点 _ (mkdirs cluster-state path) ;; 创建/errors/{storm-id}/{component-id}的子顺序节点,并写入异常信息 _ (create-sequential cluster-state (str path "/e") (utils/serialize data)) ;; to-kill绑定除去顺序节点编号最大的前10个节点的剩余节点的集合 to-kill (->> (get-children cluster-state path false) (sort-by parse-error-path) reverse (drop 10))] ;; 删除to-kill中包含的节点 (doseq [k to-kill] (delete-node cluster-state (str path "/" k))))) ;; 得到给定的storm-id component-id下的异常信息 (errors [this storm-id component-id] (let [path (error-path storm-id component-id) _ (mkdirs cluster-state path) children (get-children cluster-state path false) errors (dofor [c children] (let [data (-> (get-data cluster-state (str path "/" c) false) maybe-deserialize)] (when data (struct taskerror (:error data) (:time-secs data) (:host data) (:port data)) ))) ] (->> (filter not-nil? errors) (sort-by (comp - :time-secs))))) ;; 关闭连接,在关闭连接前,将回调函数从cluster-state的callbacks中删除 (disconnect [this] (unregister cluster-state state-id) (when solo? (close cluster-state))))))
zookeeper.clj中mk-client函数
mk-client函数创建一个curatorframework实例,为该实例注册了curatorlistener,当一个后台操作完成或者指定的watch被触发时将会执行curatorlistener中的eventreceived()。eventreceived中调用的wacher函数就是mk-distributed-cluster-state中:watcher绑定的函数。
(defnk mk-client [conf servers port :root "" :watcher default-watcher :auth-conf nil] (let [fk (utils/newcurator conf servers port root (when auth-conf (zookeeperauthinfo. auth-conf)))] (.. fk (getcuratorlistenable) (addlistener (reify curatorlistener (^void eventreceived [this ^curatorframework _fk ^curatorevent e] (when (= (.gettype e) curatoreventtype/watched) (let [^watchedevent event (.getwatchedevent e)] (watcher (zk-keeper-states (.getstate event)) (zk-event-types (.gettype event)) (.getpath event)))))))) (.start fk) fk))
以上就是storm与zookeeper进行交互的源码分析,我觉得最重要的部分就是如何给zk client添加"wacher",storm的很多功能都是通过zookeeper的wacher机制实现的,如"分配信息领取"。添加"wacher"大概分为以下几个步骤:
mk-distributed-cluster-state函数创建了一个zk client,并通过:watcher给该zk client指定了"wacher"函数,这个"wacher"函数只是简单调用clusterstate的callbacks集合中的函数,这样这个"wacher"函数执行 哪些函数将由clusterstate实例决定
clusterstate实例提供register函数来更新callbacks集合,clusterstate实例被传递给了mk-storm-cluster-state函数,在mk-storm-cluster-state中调用register添加了一个函数(fn [type path] ... ),这个函数实现了"watcher"函数的全部逻辑
mk-storm-cluster-state中注册的函数执行的具体内容由stormclusterstate实例决定,对zookeeper节点添加"观察"也是通过stormclusterstate实例实现的,这样我们就可以通过stormclusterstate实例对我们感兴趣的节点添加"观察"和"回调函数",当节点或节点数据发生变化后,zk server就会给zk client发送"通知",zk client中的"wather"函数将被调用,进而我们注册的"回到函数"将被执行。
总结
这部分源码与zookeeper联系十分紧密,涉及了很多zookeeper中的概念和特性,如"数据观察"和"节点观察"等,有关zookeeper的wacher机制请参考:,storm并没有直接使用zookeeper的api,而是使用curator框架,curator框架简化了访问zookeeper的操作。关于curator框架请参考:。
以上就是本文关于源码阅读之storm操作zookeeper-cluster.clj的全部内容了,感兴趣的朋友可以参阅:、、等,希望对大家有所帮助。感谢各位的阅读!
上一篇: JRebel/ JavaRebel 2.1发布,一个JVM插件
下一篇: 最近出现的问题
推荐阅读
-
源码阅读之storm操作zookeeper-cluster.clj
-
Spark源码阅读之HistoryServer
-
从kafka源码阅读中看出了问题之abstract interface
-
MongoDB源码阅读之Shard源码分析--CongfigServer启动
-
三个点在同一个半圆的概率_Cartographer源码阅读之附 1—probability_values.h/c:占据概率相关...
-
MongoDB源码阅读之Shard源码分析--CongfigServer启动
-
dubbo源码阅读之SPI
-
java源码阅读之java.lang.Object
-
java源码阅读之java.lang.Object
-
iOS源码阅读必备知识之Tagged Pointer