欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Zookeeper源码阅读(六) Watcher

程序员文章站 2022-06-01 13:41:34
...

前言

好久没有更新博客了,最近这段时间过得很压抑,终于开始踏上为换工作准备的正轨了,工作又真的很忙而且很琐碎,让自己有点烦恼,希望能早点结束这种状态。

继上次分析了ZK的ACL相关代码后,ZK里非常重要的另一个特性就是Watcher机制了。其实在我看来,就ZK的使用而言,Watche机制是最核心的特性也不为过了!这一篇先简单介绍下watcher相关的实体类和接口。

Watcher机制

在ZK中,客户端可以为znode向服务端注册监听,当相应znode的指定事件被触发时,服务端就会向客户端发送通知,而客户端收到通知后也会执行相应的响应逻辑。整体逻辑如下:

Zookeeper源码阅读(六) Watcher

Watcher特性:

  1. 一次性:每次注册的watcher只能被触发一次,之后就会被删除,如果想继续触发需要注册新的watcher;
  2. 串行性:客户端执行watcher是一个串行过程;
  3. 轻量性:watcher仅包含通知状态、事件类型和节点路径。

Watcher总体框架

看Watcher相关的代码和写这篇博客参考了很多资料,首先列一下Watcher的总体的框架:
Zookeeper源码阅读(六) Watcher
Zookeeper源码阅读(六) Watcher

上面两幅图能很清楚的看到和Watcher有关的接口和类的结构,简单介绍下它们的功能:

Watcher和Event接口:使用过ZK的人对Watcher接口应该很熟悉,内部定义的process方法是watcher被触发是zk调用的方法。同时,watcher内部有内部接口Event,定义了事件的类型(连接状态KeeperState和znode的变化状态EventType);

WatchedEvent接口:连接状态和znode的变化状态以及znode的路径;

WatcherEvent接口:内容和WatchedEvent一模一样,但是是负责网络传输的,由jute生成。

ClientWatchManager接口:根据event返回相应的watcher(Return a set of watchers that should be notified of the event.)。定义了materialize方法。ZKWatchManager实现了这个接口。

代码分析

Watcher接口

Zookeeper源码阅读(六) Watcher

这是watcher接口和两个内部枚举之间的类图关系,详细看下代码:

/**
 * This interface specifies the public interface an event handler class must
 * implement. A ZooKeeper client will get various events from the ZooKeeper
 * server it connects to. An application using such a client handles these
 * events by registering a callback object with the client. The callback object
 * is expected to be an instance of a class that implements Watcher interface.
 * 
 */
@InterfaceAudience.Public
public interface Watcher {

    /**
     * This interface defines the possible states an Event may represent
     */
    @InterfaceAudience.Public
    //内部接口Event,表示事件的状态
    public interface Event {
        /**
         * Enumeration of states the ZooKeeper may be at the event
         */
        @InterfaceAudience.Public
        //内部枚举,连接状态
        public enum KeeperState {
            /** Unused, this state is never generated by the server */
            @Deprecated
            Unknown (-1),

            /** The client is in the disconnected state - it is not connected
             * to any server in the ensemble. */
             //断开连接-0
            Disconnected (0),

            /** Unused, this state is never generated by the server */
            @Deprecated
            NoSyncConnected (1),

            /** The client is in the connected state - it is connected
             * to a server in the ensemble (one of the servers specified
             * in the host connection parameter during ZooKeeper client
             * creation). */
             //连接状态-3
            SyncConnected (3),

            /**
             * Auth failed state
             */
             //认证失败
            AuthFailed (4),

            /**
             * The client is connected to a read-only server, that is the
             * server which is not currently connected to the majority.
             * The only operations allowed after receiving this state is
             * read operations.
             * This state is generated for read-only clients only since
             * read/write clients aren't allowed to connect to r/o servers.
             */
             //连接到read-only的server
            ConnectedReadOnly (5),

            /**
              * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
              * so that they can perform Zookeeper actions with their SASL-authorized permissions.
              */
              //Sasl认证通过-6
            SaslAuthenticated(6),

            /** The serving cluster has expired this session. The ZooKeeper
             * client connection (the session) is no longer valid. You must
             * create a new client connection (instantiate a new ZooKeeper
             * instance) if you with to access the ensemble. */
             //session过期状态-(-112)
            Expired (-112);

            //intValue用来表示当前的连接状态
            private final int intValue;     // Integer representation of value
                                            // for sending over wire
            //构造器
            KeeperState(int intValue) {
                this.intValue = intValue;
            }

            //返回整型值
            public int getIntValue() {
                return intValue;
            }

            //int->状态
            public static KeeperState fromInt(int intValue) {
                switch(intValue) {
                    case   -1: return KeeperState.Unknown;
                    case    0: return KeeperState.Disconnected;
                    case    1: return KeeperState.NoSyncConnected;
                    case    3: return KeeperState.SyncConnected;
                    case    4: return KeeperState.AuthFailed;
                    case    5: return KeeperState.ConnectedReadOnly;
                    case    6: return KeeperState.SaslAuthenticated;
                    case -112: return KeeperState.Expired;

                    default:
                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");
                }
            }
        }

        /**
         * Enumeration of types of events that may occur on the ZooKeeper
         */
        @InterfaceAudience.Public
        //事件类型
        public enum EventType {
            None (-1),//无事件
            NodeCreated (1),//创建节点
            NodeDeleted (2),//删除节点
            NodeDataChanged (3),//节点数据改变
            NodeChildrenChanged (4);//节点的孩子节点发生change

            //时间整型值
            private final int intValue;     // Integer representation of value
                                            // for sending over wire
            //构造器
            EventType(int intValue) {
                this.intValue = intValue;
            }
            //获取整型值
            public int getIntValue() {
                return intValue;
            }
            //int->事件类型
            public static EventType fromInt(int intValue) {
                switch(intValue) {
                    case -1: return EventType.None;
                    case  1: return EventType.NodeCreated;
                    case  2: return EventType.NodeDeleted;
                    case  3: return EventType.NodeDataChanged;
                    case  4: return EventType.NodeChildrenChanged;

                    default:
                        throw new RuntimeException("Invalid integer value for conversion to EventType");
                }
            }           
        }
    }

    //回调方法
    abstract public void process(WatchedEvent event);
}

为了更简洁的说明Event接口中两种状态在实际使用时的情况,用《从zk到paxos》中的表来表示下:

Zookeeper源码阅读(六) Watcher

特别的说明:

  1. NodeDataChanged事件包含znode的dataversion和data本身的修改均会触发Watcher,所以即使用相同内容来更新data,dataversion依然会更新;NodeChildrenDataChanged则指的是子节点列表发生变化,如节点增加或删除时会触发。
  2. AuthFailed和NoAuth是两种状态,前者是auth的模式不对(例如选择了digest1,而不是正确的digest模式),后者是表示auth信息不对。
  3. process这个回调方法非常重要。当zk向客户端发送一个watcher通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理。

WatchedEvent和WatcherEvent

/**
 *  A WatchedEvent represents a change on the ZooKeeper that a Watcher
 *  is able to respond to.  The WatchedEvent includes exactly what happened,
 *  the current state of the ZooKeeper, and the path of the znode that
 *  was involved in the event.
 */
@InterfaceAudience.Public
public class WatchedEvent {
    final private KeeperState keeperState;//连接状态
    final private EventType eventType;//事件类型
    private String path;//路径

WatchedEvent用来封装服务端事件并传递给Watcher,从而方便回调方法process来处理。

WatcherEvetn从内容含义上来说和WatchedEvent是一样的,只是WatcherEvent实现了Record接口,方便序列化来进行网络传输。

public class WatcherEvent implements Record {
  private int type;//事件类型
  private int state;//连接状态
  private String path;//路径

特别的是,WatchedEvent中的getWrapper方法就是把WatchedEvent包装成WatcherEvent,代码很简单:

/**
 *  Convert WatchedEvent to type that can be sent over network
 */
public WatcherEvent getWrapper() {
    return new WatcherEvent(eventType.getIntValue(), //调用watcherevent的构造函数
                            keeperState.getIntValue(), 
                            path);
}

ClientWatchManager和ZKWatchManager

ClientWatchManager是根据获得的Event得到需要通知的watcher。

public interface ClientWatchManager {
    /**
     * Return a set of watchers that should be notified of the event. The 
     * manager must not notify the watcher(s), however it will update it's 
     * internal structure as if the watches had triggered. The intent being 
     * that the callee is now responsible for notifying the watchers of the 
     * event, possibly at some later time.
     * 
     * @param state event state
     * @param type event type
     * @param path event path
     * @return may be empty set but must not be null
     */
    public Set<Watcher> materialize(Watcher.Event.KeeperState state,
        Watcher.Event.EventType type, String path);

其实现类ZKWatchManager在Zookeeper类中:

/**
 * Manage watchers & handle events generated by the ClientCnxn object.
 * 可以看到这个类的目的是为了处理有clientCnxn获取的events和相关的watcher
 * We are implementing this as a nested class of ZooKeeper so that
 * the public methods will not be exposed as part of the ZooKeeper client
 * API.
 */
private static class ZKWatchManager implements ClientWatchManager {
    private final Map<String, Set<Watcher>> dataWatches =
        new HashMap<String, Set<Watcher>>();//通过getdata设置的watch
    private final Map<String, Set<Watcher>> existWatches =
        new HashMap<String, Set<Watcher>>();//通过exists设置的watch
    private final Map<String, Set<Watcher>> childWatches =
        new HashMap<String, Set<Watcher>>();//通过getchildren设置的watch

    private volatile Watcher defaultWatcher;//client在和zookeeper建立连接时传递的watcher

    final private void addTo(Set<Watcher> from, Set<Watcher> to) {
        if (from != null) {
            to.addAll(from);
        }
    }

    /* (non-Javadoc)
     * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, 
     *                                                        Event.EventType, java.lang.String)
     */
    @Override
    public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                    Watcher.Event.EventType type,
                                    String clientPath)
    {
        //相关的watcher集合
        Set<Watcher> result = new HashSet<Watcher>();

        switch (type) {
        case None://事件类型是none
            result.add(defaultWatcher);//把defaultWatcher加入
            //是否清空,根据zookeeper.disableAutoWatchReset字段进行配置的值、Zookeeper的状态是否为同步连接来判断
            boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                    state != Watcher.Event.KeeperState.SyncConnected;//

            //datawatches同步块
            synchronized(dataWatches) {
                for(Set<Watcher> ws: dataWatches.values()) {
                    result.addAll(ws);//把所有的watcher加入...含义应该是建立连接后如果连接断开或超时会清空所有的watcher
                }
                if (clear) {//如果要清空
                    dataWatches.clear();
                }
            }

            synchronized(existWatches) {//同上
                for(Set<Watcher> ws: existWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    existWatches.clear();
                }
            }

            synchronized(childWatches) {//同上
                for(Set<Watcher> ws: childWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    childWatches.clear();
                }
            }

            return result;
        case NodeDataChanged:
        case NodeCreated://nodedatachange和新建node
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);//把所有clientPath位置的datawatch移除并加入result
            }
            synchronized (existWatches) {
                addTo(existWatches.remove(clientPath), result);//把所有clientPath位置的existwatch移除并加入result
            }
            break;
        case NodeChildrenChanged://node的孩子节点发生改变
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);//把所有clientPath位置的childwatch移除并加入result
            }
            break;
            case NodeDeleted://node节点被删除
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);//把所有clientPath位置的datawatch移除并加入result
            }
            // XXX This shouldn't be needed, but just in case
            synchronized (existWatches) {//不应该发生,为什么呢?
                Set<Watcher> list = existWatches.remove(clientPath);
                if (list != null) {
                    addTo(existWatches.remove(clientPath), result);
                    LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                }
            }
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);//把所有clientPath位置的childwatch移除并加入result
            }
            break;
        default://默认处理
            String msg = "Unhandled watch event type " + type
                + " with state " + state + " on path " + clientPath;
            LOG.error(msg);
            throw new RuntimeException(msg);
        }

        return result;
    }
}

可以看到,这里的代码里体现了watcher的一次性,每次触发之后原来的watcher就会被删除。

WatcherSetEventPair

这个类的目的很简单,就是为了把Event和Watcher绑定起来,此类在clientCnxn中。

private static class WatcherSetEventPair {
    private final Set<Watcher> watchers;//watchers
    private final WatchedEvent event;//事件

    //构造器
    public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
        this.watchers = watchers;
        this.event = event;
    }
}

总结

这一篇主要先说一下Watcher相关的一些接口和实体类,但是尽管参考了许多资料,但还是有几处不太理解:

  1. 为什么nodedelete后出发的watcher中exist相关的不会被触发呢?

  2. 为什么需要WatcherSetEventPair 这个类(参考,讲得很好,后面会涉及到这里)

    因为watcher接口process函数需要event参数
    那么在ClientWatchManager完成了根据event找到对应的watchers之后
    就可以直接调用watcher.process(event)了

    但是!!!由于ClientCnxn.EventThread是异步处理的,通过生产消费完成
    在processEvent的函数中,要取出一个数据结构Object,既包含watchers集合,又要包含event,所以就把两者组合在一起出现了WatcherSetEventPair 这个类;

  3. Watcher的注册和触发

  4. materialize方法中none时加入所有的watcher应该是为了在连接状态发生变化时删除所有的watcher。

参考

https://www.jianshu.com/p/4d09cc083571

https://www.cnblogs.com/leesf456/p/6286827.html