前言
好久没有更新博客了,最近这段时间过得很压抑,终于开始踏上为换工作准备的正轨了,工作又真的很忙而且很琐碎,让自己有点烦恼,希望能早点结束这种状态。
继上次分析了ZK的ACL相关代码后,ZK里非常重要的另一个特性就是Watcher机制了。其实在我看来,就ZK的使用而言,Watche机制是最核心的特性也不为过了!这一篇先简单介绍下watcher相关的实体类和接口。
Watcher机制
在ZK中,客户端可以为znode向服务端注册监听,当相应znode的指定事件被触发时,服务端就会向客户端发送通知,而客户端收到通知后也会执行相应的响应逻辑。整体逻辑如下:
Watcher特性:
- 一次性:每次注册的watcher只能被触发一次,之后就会被删除,如果想继续触发需要注册新的watcher;
- 串行性:客户端执行watcher是一个串行过程;
- 轻量性:watcher仅包含通知状态、事件类型和节点路径。
Watcher总体框架
看Watcher相关的代码和写这篇博客参考了很多资料,首先列一下Watcher的总体的框架:
上面两幅图能很清楚的看到和Watcher有关的接口和类的结构,简单介绍下它们的功能:
Watcher和Event接口:使用过ZK的人对Watcher接口应该很熟悉,内部定义的process方法是watcher被触发是zk调用的方法。同时,watcher内部有内部接口Event,定义了事件的类型(连接状态KeeperState和znode的变化状态EventType);
WatchedEvent接口:连接状态和znode的变化状态以及znode的路径;
WatcherEvent接口:内容和WatchedEvent一模一样,但是是负责网络传输的,由jute生成。
ClientWatchManager接口:根据event返回相应的watcher(Return a set of watchers that should be notified of the event.)。定义了materialize方法。ZKWatchManager实现了这个接口。
代码分析
Watcher接口
这是watcher接口和两个内部枚举之间的类图关系,详细看下代码:
/**
* This interface specifies the public interface an event handler class must
* implement. A ZooKeeper client will get various events from the ZooKeeper
* server it connects to. An application using such a client handles these
* events by registering a callback object with the client. The callback object
* is expected to be an instance of a class that implements Watcher interface.
*
*/
@InterfaceAudience.Public
public interface Watcher {
/**
* This interface defines the possible states an Event may represent
*/
@InterfaceAudience.Public
//内部接口Event,表示事件的状态
public interface Event {
/**
* Enumeration of states the ZooKeeper may be at the event
*/
@InterfaceAudience.Public
//内部枚举,连接状态
public enum KeeperState {
/** Unused, this state is never generated by the server */
@Deprecated
Unknown (-1),
/** The client is in the disconnected state - it is not connected
* to any server in the ensemble. */
//断开连接-0
Disconnected (0),
/** Unused, this state is never generated by the server */
@Deprecated
NoSyncConnected (1),
/** The client is in the connected state - it is connected
* to a server in the ensemble (one of the servers specified
* in the host connection parameter during ZooKeeper client
* creation). */
//连接状态-3
SyncConnected (3),
/**
* Auth failed state
*/
//认证失败
AuthFailed (4),
/**
* The client is connected to a read-only server, that is the
* server which is not currently connected to the majority.
* The only operations allowed after receiving this state is
* read operations.
* This state is generated for read-only clients only since
* read/write clients aren't allowed to connect to r/o servers.
*/
//连接到read-only的server
ConnectedReadOnly (5),
/**
* SaslAuthenticated: used to notify clients that they are SASL-authenticated,
* so that they can perform Zookeeper actions with their SASL-authorized permissions.
*/
//Sasl认证通过-6
SaslAuthenticated(6),
/** The serving cluster has expired this session. The ZooKeeper
* client connection (the session) is no longer valid. You must
* create a new client connection (instantiate a new ZooKeeper
* instance) if you with to access the ensemble. */
//session过期状态-(-112)
Expired (-112);
//intValue用来表示当前的连接状态
private final int intValue; // Integer representation of value
// for sending over wire
//构造器
KeeperState(int intValue) {
this.intValue = intValue;
}
//返回整型值
public int getIntValue() {
return intValue;
}
//int->状态
public static KeeperState fromInt(int intValue) {
switch(intValue) {
case -1: return KeeperState.Unknown;
case 0: return KeeperState.Disconnected;
case 1: return KeeperState.NoSyncConnected;
case 3: return KeeperState.SyncConnected;
case 4: return KeeperState.AuthFailed;
case 5: return KeeperState.ConnectedReadOnly;
case 6: return KeeperState.SaslAuthenticated;
case -112: return KeeperState.Expired;
default:
throw new RuntimeException("Invalid integer value for conversion to KeeperState");
}
}
}
/**
* Enumeration of types of events that may occur on the ZooKeeper
*/
@InterfaceAudience.Public
//事件类型
public enum EventType {
None (-1),//无事件
NodeCreated (1),//创建节点
NodeDeleted (2),//删除节点
NodeDataChanged (3),//节点数据改变
NodeChildrenChanged (4);//节点的孩子节点发生change
//时间整型值
private final int intValue; // Integer representation of value
// for sending over wire
//构造器
EventType(int intValue) {
this.intValue = intValue;
}
//获取整型值
public int getIntValue() {
return intValue;
}
//int->事件类型
public static EventType fromInt(int intValue) {
switch(intValue) {
case -1: return EventType.None;
case 1: return EventType.NodeCreated;
case 2: return EventType.NodeDeleted;
case 3: return EventType.NodeDataChanged;
case 4: return EventType.NodeChildrenChanged;
default:
throw new RuntimeException("Invalid integer value for conversion to EventType");
}
}
}
}
//回调方法
abstract public void process(WatchedEvent event);
}
为了更简洁的说明Event接口中两种状态在实际使用时的情况,用《从zk到paxos》中的表来表示下:
特别的说明:
- NodeDataChanged事件包含znode的dataversion和data本身的修改均会触发Watcher,所以即使用相同内容来更新data,dataversion依然会更新;NodeChildrenDataChanged则指的是子节点列表发生变化,如节点增加或删除时会触发。
- AuthFailed和NoAuth是两种状态,前者是auth的模式不对(例如选择了digest1,而不是正确的digest模式),后者是表示auth信息不对。
- process这个回调方法非常重要。当zk向客户端发送一个watcher通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理。
WatchedEvent和WatcherEvent
/**
* A WatchedEvent represents a change on the ZooKeeper that a Watcher
* is able to respond to. The WatchedEvent includes exactly what happened,
* the current state of the ZooKeeper, and the path of the znode that
* was involved in the event.
*/
@InterfaceAudience.Public
public class WatchedEvent {
final private KeeperState keeperState;//连接状态
final private EventType eventType;//事件类型
private String path;//路径
WatchedEvent用来封装服务端事件并传递给Watcher,从而方便回调方法process来处理。
WatcherEvetn从内容含义上来说和WatchedEvent是一样的,只是WatcherEvent实现了Record接口,方便序列化来进行网络传输。
public class WatcherEvent implements Record {
private int type;//事件类型
private int state;//连接状态
private String path;//路径
特别的是,WatchedEvent中的getWrapper方法就是把WatchedEvent包装成WatcherEvent,代码很简单:
/**
* Convert WatchedEvent to type that can be sent over network
*/
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(), //调用watcherevent的构造函数
keeperState.getIntValue(),
path);
}
ClientWatchManager和ZKWatchManager
ClientWatchManager是根据获得的Event得到需要通知的watcher。
public interface ClientWatchManager {
/**
* Return a set of watchers that should be notified of the event. The
* manager must not notify the watcher(s), however it will update it's
* internal structure as if the watches had triggered. The intent being
* that the callee is now responsible for notifying the watchers of the
* event, possibly at some later time.
*
* @param state event state
* @param type event type
* @param path event path
* @return may be empty set but must not be null
*/
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type, String path);
其实现类ZKWatchManager在Zookeeper类中:
/**
* Manage watchers & handle events generated by the ClientCnxn object.
* 可以看到这个类的目的是为了处理有clientCnxn获取的events和相关的watcher
* We are implementing this as a nested class of ZooKeeper so that
* the public methods will not be exposed as part of the ZooKeeper client
* API.
*/
private static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();//通过getdata设置的watch
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();//通过exists设置的watch
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();//通过getchildren设置的watch
private volatile Watcher defaultWatcher;//client在和zookeeper建立连接时传递的watcher
final private void addTo(Set<Watcher> from, Set<Watcher> to) {
if (from != null) {
to.addAll(from);
}
}
/* (non-Javadoc)
* @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState,
* Event.EventType, java.lang.String)
*/
@Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
//相关的watcher集合
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
case None://事件类型是none
result.add(defaultWatcher);//把defaultWatcher加入
//是否清空,根据zookeeper.disableAutoWatchReset字段进行配置的值、Zookeeper的状态是否为同步连接来判断
boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
state != Watcher.Event.KeeperState.SyncConnected;//
//datawatches同步块
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);//把所有的watcher加入...含义应该是建立连接后如果连接断开或超时会清空所有的watcher
}
if (clear) {//如果要清空
dataWatches.clear();
}
}
synchronized(existWatches) {//同上
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}
synchronized(childWatches) {//同上
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}
return result;
case NodeDataChanged:
case NodeCreated://nodedatachange和新建node
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);//把所有clientPath位置的datawatch移除并加入result
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);//把所有clientPath位置的existwatch移除并加入result
}
break;
case NodeChildrenChanged://node的孩子节点发生改变
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);//把所有clientPath位置的childwatch移除并加入result
}
break;
case NodeDeleted://node节点被删除
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);//把所有clientPath位置的datawatch移除并加入result
}
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {//不应该发生,为什么呢?
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(existWatches.remove(clientPath), result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);//把所有clientPath位置的childwatch移除并加入result
}
break;
default://默认处理
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}
return result;
}
}
可以看到,这里的代码里体现了watcher的一次性,每次触发之后原来的watcher就会被删除。
WatcherSetEventPair
这个类的目的很简单,就是为了把Event和Watcher绑定起来,此类在clientCnxn中。
private static class WatcherSetEventPair {
private final Set<Watcher> watchers;//watchers
private final WatchedEvent event;//事件
//构造器
public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
this.watchers = watchers;
this.event = event;
}
}
总结
这一篇主要先说一下Watcher相关的一些接口和实体类,但是尽管参考了许多资料,但还是有几处不太理解:
为什么nodedelete后出发的watcher中exist相关的不会被触发呢?
-
为什么需要WatcherSetEventPair 这个类(参考,讲得很好,后面会涉及到这里)
因为watcher接口process函数需要event参数
那么在ClientWatchManager完成了根据event找到对应的watchers之后
就可以直接调用watcher.process(event)了但是!!!由于ClientCnxn.EventThread是异步处理的,通过生产消费完成
在processEvent的函数中,要取出一个数据结构Object,既包含watchers集合,又要包含event,所以就把两者组合在一起出现了WatcherSetEventPair 这个类; Watcher的注册和触发
materialize方法中none时加入所有的watcher应该是为了在连接状态发生变化时删除所有的watcher。
参考
https://www.jianshu.com/p/4d09cc083571
https://www.cnblogs.com/leesf456/p/6286827.html