欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Zookeeper-会话初始化阶段的源码分析

程序员文章站 2022-07-13 15:19:22
...

调用Zookeeper的构造方法来实例化一个Zookeeper。我们以一个构造方法进行一步一步的分析。

 

下面是一个Zookeeper的构造方法:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly, HostProvider aHostProvider)
            throws IOException {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

        watchManager = defaultWatchManager();
        watchManager.defaultWatcher = watcher;

        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        hostProvider = aHostProvider;

        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
    }

 

在《Zoopkeeper-会话创建流程》文章中已经介绍了初始化阶段所做的事情。这里就从每行代码开始分析。

第一行代码:

LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

  这了一看就是在进行日志的记录。日志我们每个项目都在,而且定义LOG的方式也见过很多。这里看看zookeeper是如何创建的,并且思考why!跟踪的代码如下:

private static final Logger LOG;
    static {
        //Keep these two lines together to keep the initialization order explicit
        LOG = LoggerFactory.getLogger(ZooKeeper.class);
        Environment.logEnv("Client environment:", LOG);
    }

 

LOG的定义是static final 形式的。那么为什么会这样定义呢?那就需要知道这样定义的好处。网上找到了2片文章写的不错。地址:

  • http://www.importnew.com/7553.html
  • http://www.importnew.com/7440.html

总结一下就是这样定义可以提高性能。

 

 

这里除了初始化一个LOG还多了一行代码:Environment.logEnv("Client environment:", LOG);这行代码会对客户端一些环境参数进行日志记录。哪些是环境参数呢?譬如客户端ip地址、内存。那么java中怎么获取这些信息。平时很少会关注这样的方式。今天看源码就当扩展一下对java基础类的了解。

put(l, "host.name",InetAddress.getLocalHost().getCanonicalHostName()); 获取主机地址
// Get memory information.
Runtime runtime = Runtime.getRuntime();
int mb = 1024 * 1024;
put(l, "os.memory.free",Long.toString(runtime.freeMemory() / mb) + "MB"); jvm空闲的内存
put(l, "os.memory.max",Long.toString(runtime.maxMemory() / mb) + "MB");jvm能获取的最大内存
put(l, "os.memory.total",Long.toString(runtime.totalMemory() / mb) + "MB");jvm当前占用的内存

 

参考文档:http://7sunet.iteye.com/blog/285007

这行代码引发了我的一个思考:当我们开发的不是web程序时,是否需要对一些环境信息进行一些记录呢。

 

接下来的2行代码:

watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;

 

创建了一个ZKWatchManager对象,然后把watcher赋值给了ZKWatchManager对象的defaultWatcher 属性。ZKWatchManager实现了ClientWatchManager接口。也就是创建了一个客户端Watcher管理器。

 

接下来的3行代码:

 

ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);

 首先创建了一个ConnectStringParser类,从字面上可以理解这个类是一个连接字符串的解析类。跟踪代码去了解一下这类的真是面貌:

首先印入眼帘的就是这个类是final类型的。

/**
 * A parser for ZooKeeper Client connect strings.
 * 
 * This class is not meant to be seen or used outside of ZooKeeper itself.
 * 
 * The chrootPath member should be replaced by a Path object in issue
 * ZOOKEEPER-849.
 * 
 * @see org.apache.zookeeper.ZooKeeper
 */
public final class ConnectStringParser 

 注解写的很清楚,这个类不能被ZooKeeper 以外的类看见或者使用。这个类具体解析连接字符串的逻辑是在构造函数中编写的。这里对字符串的处理并非使用了String自带的split方法。而是使用Zookeeper项目中StringUtils类。看来对字符串的处理是每个项目都必须要做的啊。我见过的项目几乎都会自己写一个名叫StringUtils的类。我们看看Zookeeper怎么实现split方法的。

 /**
     * This method returns an immutable List<String>, but different from String's split()
     * it trims the results in the input String, and removes any empty string from
     * the resulting List.
     *
     */
    public static List<String> split(String value, String separator) {
        String[] splits = value.split(separator);
        List<String> results = new ArrayList<String>();
        for (int i = 0; i < splits.length; i++) {
            splits[i] = splits[i].trim();
            if (splits[i].length() > 0) {
               results.add(splits[i]);
            }
        }
        return Collections.unmodifiableList(results);
    }

 从注解上看只是对字符串进行了一些自己业务需求的处理。底层用到的还是String的split方法。但是有一点是值的关注的。那就是返回的是immutable 集合。记得在《重构》这本书有一种思想叫做:Encapsulate Collection (封装集群) 。也就是这个集合在返回之后是不允许再修改的。那么调用者怎么知道不能修改呢?所以这里使用了Collections.unmodifiableList方法返回一个不可变的集合。如果修改则会抛出java.lang.UnsupportedOperationException。使用这个方式之后代码变的更加优美,更加健壮。

我们再回到ConnectStringParser 类中,最终地址是放到了一个集合中:serverAddresses。而这个集合的初始化时这样的:

private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();

 

再次用到了final字段。目的应该就是上面文章中分析的。然而这里使用到了InetSocketAddress。对于IP地址和端口的存放,Zookeeper是存放在了jdk net包中的InetSocketAddress。存放的代码为:serverAddresses.add(InetSocketAddress.createUnresolved(host, port)); 如果是我的话,我可能就直接自己定义一个类表示serverAddress了。我理解使用这个类的好处有:

  • 这个类会对IP和端口做响应的校验。保证了serverAddress的正确性。
  • 减少了项目的复杂度。jdk有合适的类尽量使用现成的类而不是什么都自己去实现创造。

 接下来的4行代码:

 

hostProvider = aHostProvider;

在ZooKeeper的构造函数中最后一个参数是传递一个HostProvider(接口)。然而一般不会自己去创建,ZooKeeper自己有默认的创建实现: 

 

// default hostprovider
    private static HostProvider createDefaultHostProvider(String connectString) {
        return new StaticHostProvider(
                new ConnectStringParser(connectString).getServerAddresses());
    }

 最终返回的是StaticHostProvider(HostProvider的实现类)。这个构造方法实现了些什么呢,我们继续跟踪看看:

/**
 * Most simple HostProvider, resolves only on instantiation.
 * 
 */
public final class StaticHostProvider implements HostProvider

首先这个类也是final 类型的。其次是我们要看的构造方法:
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
       sourceOfRandomness = new Random(System.currentTimeMillis() ^ this.hashCode());

        this.serverAddresses = resolveAndShuffle(serverAddresses);
        if (this.serverAddresses.isEmpty()) {
            throw new IllegalArgumentException(
                    "A HostProvider may not be empty!");
        }       
        currentIndex = -1;
        lastIndex = -1;              
    }

 这里构造方法大致可以理解为:构造一个随机种子,然后对服务器地址进行解析和打乱操作:

private List<InetSocketAddress> resolveAndShuffle(Collection<InetSocketAddress> serverAddresses) {
        List<InetSocketAddress> tmpList = new ArrayList<InetSocketAddress>(serverAddresses.size());       
        for (InetSocketAddress address : serverAddresses) {
            try {
                InetAddress ia = address.getAddress();
                String addr = (ia != null) ? ia.getHostAddress() : address.getHostString();
                InetAddress resolvedAddresses[] = InetAddress.getAllByName(addr);
                for (InetAddress resolvedAddress : resolvedAddresses) {
                    InetAddress taddr = InetAddress.getByAddress(address.getHostString(), resolvedAddress.getAddress());
                    tmpList.add(new InetSocketAddress(taddr, address.getPort()));
                }
            } catch (UnknownHostException ex) {
                LOG.warn("No IP address found for server: {}", address, ex);
            }
        }
        Collections.shuffle(tmpList, sourceOfRandomness);
        return tmpList;
    } 

 在StaticHostProvider的成员变量中,有一个地方让我在意了一下:

private final List<InetSocketAddress> oldServers = new ArrayList<InetSocketAddress>(
            5);

 代码非常的简单。就是创建一个集合。但是它设置了大小。那么问题来了,我们一般创建ArrayList不会去设置大小。那么设置大小和不设置大小有什么区别呢?我们看看集合的构造方法。

public ArrayList(int initialCapacity) {
        super();
        if (initialCapacity < 0)
            throw new IllegalArgumentException("Illegal Capacity: "+
                                               initialCapacity);
        this.elementData = new Object[initialCapacity];
    }

    /**
     * Constructs an empty list with an initial capacity of ten.
     */
    public ArrayList() {
        this(10);
    }

 上面的代码解释了区别。默认是10.然后就这么点不同吗?我在代码中发现了一段如何扩充大小的代码:

/**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * Increases the capacity to ensure that it can hold at least the
     * number of elements specified by the minimum capacity argument.
     *
     * @param minCapacity the desired minimum capacity
     */
    private void grow(int minCapacity) {
        // overflow-conscious code
        int oldCapacity = elementData.length;
        int newCapacity = oldCapacity + (oldCapacity >> 1);
        if (newCapacity - minCapacity < 0)
            newCapacity = minCapacity;
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        // minCapacity is usually close to size, so this is a win:
        elementData = Arrays.copyOf(elementData, newCapacity);
    }

    private static int hugeCapacity(int minCapacity) {
        if (minCapacity < 0) // overflow
            throw new OutOfMemoryError();
        return (minCapacity > MAX_ARRAY_SIZE) ?
            Integer.MAX_VALUE :
            MAX_ARRAY_SIZE;
    }

 

 结合注释我们可以理解:

  • 当放入集合的元素超出了容器大小的时候,容器是按照构造方法中的initialCapacity参数的倍数来增加的。
  • 容器的大小是有阀值的,大小就是Integer.MAX_VALUE;
  • Integer.MAX_VALUE=2147483647  int 的最大值。
  • MAX_ARRAY_SIZE 数组最大的容量=Integer.MAX_VALUE - 8; 解释是一些vm会保留一些数组的头信息。
  • 分配一个大的容器可能会出现OutOfMemoryError异常。
  • 这里为什么会减去8.我个人的猜测是。数组是java对象。在Java中,一个空Object对象的大小是8byte。参考文章:http://hllvm.group.iteye.com/group/wiki/2860-JVM
看了结构的代码我们回过头来看看StaticHostProvider的成员变量oldServers 为什么容器初始值是5.我的猜测是,Zookeeper的服务器如果集群的话最少是3台。那么我认为5台是比较合理的架构。纯属个人的猜测。
接下来的5行代码:
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
 getClientCnxnSocket()方法返回一个ClientCnxnSocket对象。ClientCnxnSocket是一个抽象类。代码如下:
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
        String clientCnxnSocketName = System
                .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
        if (clientCnxnSocketName == null) {
            clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
        }
        try {
            return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
                    .newInstance();
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate "
                    + clientCnxnSocketName);
            ioe.initCause(e);
            throw ioe;
        }
    }
 

代码大致的逻辑就是先从系统中获取clientCnxnSocketName。如果没有就使用ClientCnxnSocketNIO类。然后通过反射实例化一个对象。

 

/**
 * This class manages the socket i/o for the client. ClientCnxn maintains a list
 * of available servers to connect to and "transparently" switches servers it is
 * connected to as needed.
 *
 */
public class ClientCnxn 

  

类注释说的很清楚。这个类是管理网络通信的。

再看看我们调用的构造方法:

 

/**
     * Creates a connection object. The actual network connect doesn't get
     * established until needed. The start() instance method must be called
     * subsequent to construction.
     *
     * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
     * @param hostProvider
     *                the list of ZooKeeper servers to connect to
     * @param sessionTimeout
     *                the timeout for connections.
     * @param zooKeeper
     *                the zookeeper object that this connection is related to.
     * @param watcher watcher for this connection
     * @param clientCnxnSocket
     *                the socket implementation used (e.g. NIO/Netty)
     * @param sessionId session id if re-establishing session
     * @param sessionPasswd session passwd if re-establishing session
     * @param canBeReadOnly
     *                whether the connection is allowed to go to read-only
     *                mode in case of partitioning
     * @throws IOException
     */
   public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;

        connectTimeout = sessionTimeout / hostProvider.size();
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;

        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();
    }

 

构造方法的注释介绍,这里只是创建了一个连接对象。实际的网络连接并没有建立。随后必须调用start()方法。这个构造方法就是初始化一些参数。创建了2个线程。其中也显示了连接超时、读取超时的时间设定。连接超时时间随着服务器数量的增多是在减少的。而读取时间则是会话时间的2/3。第一个线程类的类注释:

/**
     * This class services the outgoing request queue and generates the heart
     * beats. It also spawns the ReadThread.
     */
    class SendThread extends ZooKeeperThread {

 

说明了这个类是服务于 outgoing request queue 和生成心跳。构造函数代码如下:

SendThread(ClientCnxnSocket clientCnxnSocket) {
            super(makeThreadName("-SendThread()"));
            state = States.CONNECTING;
            this.clientCnxnSocket = clientCnxnSocket;
            setDaemon(true);
        }

 这段代码的亮点就在最后一句。这个线程是守护线程。即java中的后台线程,是Thread实例设置了setDaemon(true),即将daemon属性设置为了true。 当程序中没有活动的前台线程时,后台线程会被jvm中断,退出程序,这是后台线程和普通线程的唯一区别。需要注意将线程设置为daemon的时机必须在其运行之前。所以注释解释了:创建对象后然后再去启动start()方法。

 EventThread() {
            super(makeThreadName("-EventThread"));
            setDaemon(true);
        }

 上面的代码可以看出EventThread同样是守护线程。

 

致辞会话的初始化流程分析完了。最后还遗留了一个类,那就是ZooKeeperThread。SendThread和EventThread都继承了这个类。我们来看看这个类是做什么的:

/**
 * This is the main class for catching all the uncaught exceptions thrown by the
 * threads.
 */
public class ZooKeeperThread extends Thread {

    private static final Logger LOG = LoggerFactory
            .getLogger(ZooKeeperThread.class);

    private UncaughtExceptionHandler  uncaughtExceptionalHandler = new UncaughtExceptionHandler() {

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            handleException(t.getName(), e);
        }
    };

    public ZooKeeperThread(String threadName) {
        super(threadName);
        setUncaughtExceptionHandler(uncaughtExceptionalHandler);
    }

    /**
     * This will be used by the uncaught exception handler and just log a
     * warning message and return.
     * 
     * @param thName
     *            - thread name
     * @param e
     *            - exception object
     */
    protected void handleException(String thName, Throwable e) {
        LOG.warn("Exception occured from thread {}", thName, e);
    }
}

 

看到上面的代码让我理解了为什么Zookeeper会自己定义一个线程类。目的就是用来记录在线程执行过程中出现的异常。使用了线程的UncaughtExceptionHandler 类。我自己在以前从来没有重视过这点。虽然知道线程的run方法不对抛出异常,但是没有思考如果出现异常后是否需要做一些处理呢。在这里只是进行了日志的记录。但是通过这个方法我们可以进行很多善后操作。譬如:重启、回收一些资源、关闭一些资源等。又学到了一点点细节。

相关标签: zookeeper