Skip to content

客户端启动

1. 客户端初始化流程

Alt text

2. 查看脚本,分析程序入口

ZkCli.sh

sh
#!/usr/bin/env bash

# use POSIX interface, symlink is followed automatically
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"

if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
  . "$ZOOBINDIR"/../libexec/zkEnv.sh
else
  . "$ZOOBINDIR"/zkEnv.sh
fi

ZOO_LOG_FILE=zookeeper-$USER-cli-$HOSTNAME.log

"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.log.file=${ZOO_LOG_FILE}" \
     -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
     org.apache.zookeeper.ZooKeeperMain "$@"

可见ZooKeeperMain为客户端入口。
程序的入口main()方法

java
public static void main(String[] args) throws IOException, InterruptedException {
    ZooKeeperMain main = new ZooKeeperMain(args);
    main.run();
}

2. 创建ZookeeperMain

  1. ZooKeeperMain的构造方法
java
public ZooKeeperMain(String[] args) throws IOException, InterruptedException {
    cl.parseOptions(args);
    System.out.println("Connecting to " + cl.getOption("server"));
    connectToZK(cl.getOption("server"));
}
  1. 远程连接zk服务器
java
protected void connectToZK(String newHost) throws InterruptedException, IOException {
    if (zk != null && zk.getState().isAlive()) {
        zk.close();
    }

    host = newHost;
    boolean readOnly = cl.getOption("readonly") != null;
    if (cl.getOption("secure") != null) {
        System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
        System.out.println("Secure connection is enabled");
    }

    ZKClientConfig clientConfig = null;

    if (cl.getOption("client-configuration") != null) {
        try {
            clientConfig = new ZKClientConfig(cl.getOption("client-configuration"));
        } catch (QuorumPeerConfig.ConfigException e) {
            e.printStackTrace();
            ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());
        }
    }

    if (cl.getOption("waitforconnection") != null) {
        connectLatch = new CountDownLatch(1);
    }

    int timeout = Integer.parseInt(cl.getOption("timeout"));
    zk = new ZooKeeperAdmin(host, timeout, new MyWatcher(), readOnly, clientConfig);
    if (connectLatch != null) {
        if (!connectLatch.await(timeout, TimeUnit.MILLISECONDS)) {
            zk.close();
            throw new IOException(KeeperException.create(KeeperException.Code.CONNECTIONLOSS));
        }
    }
}
  1. 创建ZooKeeperAdmin对象
java
public ZooKeeperAdmin(
    String connectString,
    int sessionTimeout,
    Watcher watcher,
    boolean canBeReadOnly,
    ZKClientConfig conf) throws IOException {
    super(connectString, sessionTimeout, watcher, canBeReadOnly, conf);
}
  1. ZooKeeperAdmin调用父类构造方法 ZooKeeper.java
java
public ZooKeeper(
    String connectString,
    int sessionTimeout,
    Watcher watcher,
    boolean canBeReadOnly,
    HostProvider hostProvider,
    ZKClientConfig clientConfig
) throws IOException {
    LOG.info(
        "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
        connectString,
        sessionTimeout,
        watcher);

    this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
    this.hostProvider = hostProvider;
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    // 客户端与服务器端通信的终端
    cnxn = createConnection(
        connectStringParser.getChrootPath(),
        hostProvider,
        sessionTimeout,
        this.clientConfig,
        watcher,
        getClientCnxnSocket(),
        canBeReadOnly);
    cnxn.start();
}

3. 初始化监听器

java
protected void connectToZK(String newHost) throws InterruptedException, IOException {
    if (zk != null && zk.getState().isAlive()) {
        zk.close();
    }

    host = newHost;
    boolean readOnly = cl.getOption("readonly") != null;
    if (cl.getOption("secure") != null) {
        System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
        System.out.println("Secure connection is enabled");
    }

    ZKClientConfig clientConfig = null;

    if (cl.getOption("client-configuration") != null) {
        try {
            clientConfig = new ZKClientConfig(cl.getOption("client-configuration"));
        } catch (QuorumPeerConfig.ConfigException e) {
            e.printStackTrace();
            ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());
        }
    }

    if (cl.getOption("waitforconnection") != null) {
        connectLatch = new CountDownLatch(1);
    }

    int timeout = Integer.parseInt(cl.getOption("timeout"));
    zk = new ZooKeeperAdmin(host, timeout, new MyWatcher(), readOnly, clientConfig);
    if (connectLatch != null) {
        if (!connectLatch.await(timeout, TimeUnit.MILLISECONDS)) {
            zk.close();
            throw new IOException(KeeperException.create(KeeperException.Code.CONNECTIONLOSS));
        }
    }
}
private class MyWatcher implements Watcher {

    public void process(WatchedEvent event) {
        if (getPrintWatches()) {
            ZooKeeperMain.printMessage("WATCHER::");
            ZooKeeperMain.printMessage(event.toString());
        }
        if (connectLatch != null) {
            // connection success
            if (event.getType() == Event.EventType.None
                && event.getState() == Event.KeeperState.SyncConnected) {
                connectLatch.countDown();
            }
        }
    }

}

4. 解析连接地址

java
public ConnectStringParser(String connectString) {
    // parse out chroot, if any
    int off = connectString.indexOf('/');
    if (off >= 0) {
        String chrootPath = connectString.substring(off);
        // ignore "/" chroot spec, same as null
        if (chrootPath.length() == 1) {
            this.chrootPath = null;
        } else {
            PathUtils.validatePath(chrootPath);
            this.chrootPath = chrootPath;
        }
        connectString = connectString.substring(0, off);
    } else {
        this.chrootPath = null;
    }

    List<String> hostsList = split(connectString, ",");
    for (String host : hostsList) {
        int port = DEFAULT_PORT;
        String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
        if (hostAndPort.length != 0) {
            host = hostAndPort[0];
            if (hostAndPort.length == 2) {
                port = Integer.parseInt(hostAndPort[1]);
            }
        } else {
            int pidx = host.lastIndexOf(':');
            if (pidx >= 0) {
                // otherwise : is at the end of the string, ignore
                if (pidx < host.length() - 1) {
                    port = Integer.parseInt(host.substring(pidx + 1));
                }
                host = host.substring(0, pidx);
            }
        }
        serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
    }
}

5. 创建通信

java
public ZooKeeper(
    String connectString,
    int sessionTimeout,
    Watcher watcher,
    boolean canBeReadOnly,
    HostProvider hostProvider,
    ZKClientConfig clientConfig
) throws IOException {
    LOG.info(
        "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
        connectString,
        sessionTimeout,
        watcher);

    this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
    this.hostProvider = hostProvider;
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    // 客户端与服务器端通信的终端
    cnxn = createConnection(
        connectStringParser.getChrootPath(),
        hostProvider,
        sessionTimeout,
        this.clientConfig,
        watcher,
        getClientCnxnSocket(),
        canBeReadOnly);
    cnxn.start();
}
  1. 创建socket通信
java
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
    String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
    if (clientCnxnSocketName == null || clientCnxnSocketName.equals(ClientCnxnSocketNIO.class.getSimpleName())) {
        clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
    } else if (clientCnxnSocketName.equals(ClientCnxnSocketNetty.class.getSimpleName())) {
        clientCnxnSocketName = ClientCnxnSocketNetty.class.getName();
    }
    // 通过反射获取 clientCxnSocket 对象
    try {
        Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
                                                    .getDeclaredConstructor(ZKClientConfig.class);
        ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
        return clientCxnSocket;
    } catch (Exception e) {
        throw new IOException("Couldn't instantiate " + clientCnxnSocketName, e);
    }
}
  1. 创建底层连接方法
java
ClientCnxn createConnection(
    String chrootPath,
    HostProvider hostProvider,
    int sessionTimeout,
    ZKClientConfig clientConfig,
    Watcher defaultWatcher,
    ClientCnxnSocket clientCnxnSocket,
    boolean canBeReadOnly
) throws IOException {
    return new ClientCnxn(
        chrootPath,
        hostProvider,
        sessionTimeout,
        clientConfig,
        defaultWatcher,
        clientCnxnSocket,
        canBeReadOnly);
}
  1. ClientCnxn对象创建
java
public ClientCnxn(
    String chrootPath,
    HostProvider hostProvider,
    int sessionTimeout,
    ZKClientConfig clientConfig,
    Watcher defaultWatcher,
    ClientCnxnSocket clientCnxnSocket,
    boolean canBeReadOnly
) throws IOException {
    this(
        chrootPath,
        hostProvider,
        sessionTimeout,
        clientConfig,
        defaultWatcher,
        clientCnxnSocket,
        0,
        new byte[16],
        canBeReadOnly);
}
public ClientCnxn(
    String chrootPath,
    HostProvider hostProvider,
    int sessionTimeout,
    ZKClientConfig clientConfig,
    Watcher defaultWatcher,
    ClientCnxnSocket clientCnxnSocket,
    long sessionId,
    byte[] sessionPasswd,
    boolean canBeReadOnly
) throws IOException {
    this.chrootPath = chrootPath;
    this.hostProvider = hostProvider;
    this.sessionTimeout = sessionTimeout;
    this.clientConfig = clientConfig;
    this.sessionId = sessionId;
    this.sessionPasswd = sessionPasswd;
    this.readOnly = canBeReadOnly;

    this.watchManager = new ZKWatchManager(
            clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET),
            defaultWatcher);

    this.connectTimeout = sessionTimeout / hostProvider.size();
    this.readTimeout = sessionTimeout * 2 / 3;
    // 创建了两个线程
    this.sendThread = new SendThread(clientCnxnSocket);
    this.eventThread = new EventThread();
    initRequestTimeout();
}
  1. ClientCnxn启动就运行sendThread、eventThread线程
java
public void start() {
    sendThread.start();
    eventThread.start();
}
  1. 两个线程都是后台线程
java
SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException {
    super(makeThreadName("-SendThread()"));
    changeZkState(States.CONNECTING);
    this.clientCnxnSocket = clientCnxnSocket;
    setDaemon(true);
}
EventThread() {
    super(makeThreadName("-EventThread"));
    setDaemon(true);
}
  1. SendThread、EventThread都继承了ZooKeeperThread类,查看run方法
    SendThread.java
java
public void run() {
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    InetSocketAddress serverAddress = null;
    // 在循环里面,循环发送,循环接收
    while (state.isAlive()) {
        try {
            if (!clientCnxnSocket.isConnected()) {
                // don't re-establish connection if we are closing
                if (closing) {
                    break;
                }
                if (rwServerAddress != null) {
                    serverAddress = rwServerAddress;
                    rwServerAddress = null;
                } else {
                    serverAddress = hostProvider.next(1000);
                }
                onConnecting(serverAddress);
                // 启动连接服务端
                startConnect(serverAddress);
                // Update now to start the connection timer right after we make a connection attempt
                clientCnxnSocket.updateNow();
                clientCnxnSocket.updateLastSendAndHeard();
            }

            if (state.isConnected()) {
                // determine whether we need to send an AuthFailed event.
                if (zooKeeperSaslClient != null) {
                    boolean sendAuthEvent = false;
                    if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                        try {
                            zooKeeperSaslClient.initialize(ClientCnxn.this);
                        } catch (SaslException e) {
                            LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
                            changeZkState(States.AUTH_FAILED);
                            sendAuthEvent = true;
                        }
                    }
                    KeeperState authState = zooKeeperSaslClient.getKeeperState();
                    if (authState != null) {
                        if (authState == KeeperState.AuthFailed) {
                            // An authentication error occurred during authentication with the Zookeeper Server.
                            changeZkState(States.AUTH_FAILED);
                            sendAuthEvent = true;
                        } else {
                            if (authState == KeeperState.SaslAuthenticated) {
                                sendAuthEvent = true;
                            }
                        }
                    }

                    if (sendAuthEvent) {
                        eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
                        if (state == States.AUTH_FAILED) {
                            eventThread.queueEventOfDeath();
                        }
                    }
                }
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else {
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }

            if (to <= 0) {
                String warnInfo = String.format(
                    "Client session timed out, have not heard from server in %dms for session id 0x%s",
                    clientCnxnSocket.getIdleRecv(),
                    Long.toHexString(sessionId));
                LOG.warn(warnInfo);
                throw new SessionTimeoutException(warnInfo);
            }
            if (state.isConnected()) {
                //1000(1 second) is to prevent race condition missing to send the second ping
                //also make sure not to send too many pings when readTimeout is small
                int timeToNextPing = readTimeout / 2
                                        - clientCnxnSocket.getIdleSend()
                                        - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                }
            }

            // If we are in read-only mode, seek for read/write server
            if (state == States.CONNECTEDREADONLY) {
                long now = Time.currentElapsedTime();
                int idlePingRwServer = (int) (now - lastPingRwServer);
                if (idlePingRwServer >= pingRwTimeout) {
                    lastPingRwServer = now;
                    idlePingRwServer = 0;
                    pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
                    pingRwServer();
                }
                to = Math.min(to, pingRwTimeout - idlePingRwServer);
            }
            // 接收服务端响应,并处理
            clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
        } catch (Throwable e) {
            ......
        }
    }

    synchronized (outgoingQueue) {
        // When it comes to this point, it guarantees that later queued
        // packet to outgoingQueue will be notified of death.
        cleanup();
    }
    clientCnxnSocket.close();
    if (state.isAlive()) {
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
    }
    eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));

    Login l = loginRef.getAndSet(null);
    if (l != null) {
        l.shutdown();
    }
    ZooTrace.logTraceMessage(
        LOG,
        ZooTrace.getTextTraceLevel(),
        "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}
  1. 开始连接指定地址
    ClientCnxn.java
java
private void startConnect(InetSocketAddress addr) throws IOException {
    // initializing it for new connection
    saslLoginFailed = false;
    if (!isFirstConnect) {
        try {
            Thread.sleep(ThreadLocalRandom.current().nextLong(1000));
        } catch (InterruptedException e) {
            LOG.warn("Unexpected exception", e);
        }
    }
    changeZkState(States.CONNECTING);

    String hostPort = addr.getHostString() + ":" + addr.getPort();
    MDC.put("myid", hostPort);
    setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
    if (clientConfig.isSaslClientEnabled()) {
        try {
            zooKeeperSaslClient = new ZooKeeperSaslClient(
                SaslServerPrincipal.getServerPrincipal(addr, clientConfig), clientConfig, loginRef);
        } catch (LoginException e) {
            // An authentication error occurred when the SASL client tried to initialize:
            // for Kerberos this means that the client failed to authenticate with the KDC.
            // This is different from an authentication error that occurs during communication
            // with the Zookeeper server, which is handled below.
            LOG.warn(
                "SASL configuration failed. "
                    + "Will continue connection to Zookeeper server without "
                    + "SASL authentication, if Zookeeper server allows it.", e);
            eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));
            saslLoginFailed = true;
        }
    }
    logStartConnect(addr);
    // 建立连接
    clientCnxnSocket.connect(addr);
}
  1. 查找connect实现类,ClientCnxnSocketNIO.java
java
@Override
void connect(InetSocketAddress addr) throws IOException {
    SocketChannel sock = createSock();
    try {
        registerAndConnect(sock, addr);
    } catch (UnresolvedAddressException | UnsupportedAddressTypeException | SecurityException | IOException e) {
        LOG.error("Unable to open socket to {}", addr);
        sock.close();
        throw e;
    }
    initialized = false;

    /*
        * Reset incomingBuffer
        */
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
}
  1. 注册连接
java
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
    sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    boolean immediateConnect = sock.connect(addr);
    if (immediateConnect) {
        sendThread.primeConnection();
    }
}
  1. 连接认证
java
void primeConnection() throws IOException {
    LOG.info(
        "Socket connection established, initiating session, client: {}, server: {}",
        clientCnxnSocket.getLocalSocketAddress(),
        clientCnxnSocket.getRemoteSocketAddress());
    // 标记不是第一次连接
    isFirstConnect = false;
    long sessId = (seenRwServerBefore) ? sessionId : 0;
    ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
    // We add backwards since we are pushing into the front
    // Only send if there's a pending watch
    if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
        List<String> dataWatches = watchManager.getDataWatchList();
        List<String> existWatches = watchManager.getExistWatchList();
        List<String> childWatches = watchManager.getChildWatchList();
        List<String> persistentWatches = watchManager.getPersistentWatchList();
        List<String> persistentRecursiveWatches = watchManager.getPersistentRecursiveWatchList();
        if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
                || !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
            Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
            Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
            Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
            Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
            Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
            long setWatchesLastZxid = lastZxid;

            while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
                    || persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
                List<String> dataWatchesBatch = new ArrayList<String>();
                List<String> existWatchesBatch = new ArrayList<String>();
                List<String> childWatchesBatch = new ArrayList<String>();
                List<String> persistentWatchesBatch = new ArrayList<String>();
                List<String> persistentRecursiveWatchesBatch = new ArrayList<String>();
                int batchLength = 0;

                // Note, we may exceed our max length by a bit when we add the last
                // watch in the batch. This isn't ideal, but it makes the code simpler.
                while (batchLength < SET_WATCHES_MAX_LENGTH) {
                    final String watch;
                    if (dataWatchesIter.hasNext()) {
                        watch = dataWatchesIter.next();
                        dataWatchesBatch.add(watch);
                    } else if (existWatchesIter.hasNext()) {
                        watch = existWatchesIter.next();
                        existWatchesBatch.add(watch);
                    } else if (childWatchesIter.hasNext()) {
                        watch = childWatchesIter.next();
                        childWatchesBatch.add(watch);
                    }  else if (persistentWatchesIter.hasNext()) {
                        watch = persistentWatchesIter.next();
                        persistentWatchesBatch.add(watch);
                    } else if (persistentRecursiveWatchesIter.hasNext()) {
                        watch = persistentRecursiveWatchesIter.next();
                        persistentRecursiveWatchesBatch.add(watch);
                    } else {
                        break;
                    }
                    batchLength += watch.length();
                }

                Record record;
                int opcode;
                if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
                    // maintain compatibility with older servers - if no persistent/recursive watchers
                    // are used, use the old version of SetWatches
                    record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
                    opcode = OpCode.setWatches;
                } else {
                    record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
                            childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
                    opcode = OpCode.setWatches2;
                }
                RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);
                Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
                outgoingQueue.addFirst(packet);
            }
        }
    }

    for (AuthData id : authInfo) {
        outgoingQueue.addFirst(
            new Packet(
                new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
                null,
                new AuthPacket(0, id.scheme, id.data),
                null,
                null));
    }
    outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
    clientCnxnSocket.connectionPrimed();
    LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
}
  1. 查找 doTransport 实现类,ClientCnxnSocketNIO.java
java
@Override
void doTransport(
    int waitTimeOut,
    Queue<Packet> pendingQueue,
    ClientCnxn cnxn) throws IOException, InterruptedException {
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    synchronized (this) {
        selected = selector.selectedKeys();
    }
    // Everything below and until we get back to the select is
    // non-blocking, so time is effectively a constant. That is
    // Why we just have to do this once, here
    updateNow();
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        if (k.isConnectable()) {
            if (sc.finishConnect()) {
                updateLastSendAndHeard();
                updateSocketAddresses();
                sendThread.primeConnection();
            }
        } else if (k.isReadable() || k.isWritable()) {
            doIO(pendingQueue, cnxn);
        }
    }
    if (sendThread.getZkState().isConnected()) {
        if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
            enableWrite();
        }
    }
    selected.clear();
}
  1. 处理底层IO流
java
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    if (sock == null) {
        throw new IOException("Socket is null!");
    }
    if (sockKey.isReadable()) {
        int rc = sock.read(incomingBuffer);
        if (rc < 0) {
            throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
                                            + Long.toHexString(sessionId)
                                            + ", likely server has closed socket");
        }
        if (!incomingBuffer.hasRemaining()) {
            incomingBuffer.flip();
            if (incomingBuffer == lenBuffer) {
                recvCount.getAndIncrement();
                readLength();
            } else if (!initialized) {
                readConnectResult();
                enableRead();
                if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                    // Since SASL authentication has completed (if client is configured to do so),
                    // outgoing packets waiting in the outgoingQueue can now be sent.
                    enableWrite();
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
                initialized = true;
            } else {
                // 读取服务端应答
                sendThread.readResponse(incomingBuffer);
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
            }
        }
    }
    if (sockKey.isWritable()) {
        Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());

        if (p != null) {
            updateLastSend();
            // If we already started writing p, p.bb will already exist
            if (p.bb == null) {
                if ((p.requestHeader != null)
                    && (p.requestHeader.getType() != OpCode.ping)
                    && (p.requestHeader.getType() != OpCode.auth)) {
                    p.requestHeader.setXid(cnxn.getXid());
                }
                p.createBB();
            }
            sock.write(p.bb);
            if (!p.bb.hasRemaining()) {
                sentCount.getAndIncrement();
                outgoingQueue.removeFirstOccurrence(p);
                if (p.requestHeader != null
                    && p.requestHeader.getType() != OpCode.ping
                    && p.requestHeader.getType() != OpCode.auth) {
                    synchronized (pendingQueue) {
                        pendingQueue.add(p);
                    }
                }
            }
        }
        if (outgoingQueue.isEmpty()) {
            // No more packets to send: turn off write interest flag.
            // Will be turned on later by a later call to enableWrite(),
            // from within ZooKeeperSaslClient (if client is configured
            // to attempt SASL authentication), or in either doIO() or
            // in doTransport() if not.
            disableWrite();
        } else if (!initialized && p != null && !p.bb.hasRemaining()) {
            // On initial connection, write the complete connect request
            // packet, but then disable further writes until after
            // receiving a successful connection response.  If the
            // session is expired, then the server sends the expiration
            // response and immediately closes its end of the socket.  If
            // the client is simultaneously writing on its end, then the
            // TCP stack may choose to abort with RST, in which case the
            // client would never receive the session expired event.  See
            // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
            disableWrite();
        } else {
            // Just in case
            enableWrite();
        }
    }
}

6. 执行run()

  1. 连接后执行run()方法
java
void run() throws IOException, InterruptedException {
    if (cl.getCommand() == null) {
        System.out.println("Welcome to ZooKeeper!");

        boolean jlinemissing = false;
        // only use jline if it's in the classpath
        try {
            Class<?> consoleC = Class.forName("jline.console.ConsoleReader");
            Class<?> completorC = Class.forName("org.apache.zookeeper.JLineZNodeCompleter");

            System.out.println("JLine support is enabled");

            Object console = consoleC.getConstructor().newInstance();

            Object completor = completorC.getConstructor(ZooKeeper.class).newInstance(zk);
            Method addCompletor = consoleC.getMethod("addCompleter", Class.forName("jline.console.completer.Completer"));
            addCompletor.invoke(console, completor);

            String line;
            Method readLine = consoleC.getMethod("readLine", String.class);
            while ((line = (String) readLine.invoke(console, getPrompt())) != null) {
                executeLine(line);
            }
        } catch (ClassNotFoundException
            | NoSuchMethodException
            | InvocationTargetException
            | IllegalAccessException
            | InstantiationException e
        ) {
            LOG.debug("Unable to start jline", e);
            jlinemissing = true;
        }

        if (jlinemissing) {
            System.out.println("JLine support is disabled");
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

            String line;
            while ((line = br.readLine()) != null) {
                // 一行一行读取命令
                executeLine(line);
            }
        }
    } else {
        // Command line args non-null.  Run what was passed.
        processCmd(cl);
    }
    ServiceUtils.requestSystemExit(exitCode);
}
  1. 执行命令
java
public void executeLine(String line) throws InterruptedException, IOException {
    if (!line.equals("")) {
        cl.parseCommand(line);
        addToHistory(commandCount, line);
        // 处理客户端命令
        processCmd(cl);
        commandCount++;
    }
}
  1. 解析命令
java
protected boolean processCmd(MyCommandOptions co) throws IOException, InterruptedException {
    boolean watch = false;
    try {
        // 解析命令
        watch = processZKCmd(co);
        exitCode = ExitCode.EXECUTION_FINISHED.getValue();
    } catch (CliException ex) {
        exitCode = ex.getExitCode();
        System.err.println(ex.getMessage());
    }
    return watch;
}
  1. 实际解析命令方法
java
protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
    String[] args = co.getArgArray();
    String cmd = co.getCommand();
    if (args.length < 1) {
        usage();
        throw new MalformedCommandException("No command entered");
    }

    if (!commandMap.containsKey(cmd)) {
        usage();
        throw new CommandNotFoundException("Command not found " + cmd);
    }

    boolean watch = false;

    LOG.debug("Processing {}", cmd);

    if (cmd.equals("quit")) {
        zk.close();
        ServiceUtils.requestSystemExit(exitCode);
    } else if (cmd.equals("redo") && args.length >= 2) {
        Integer i = Integer.decode(args[1]);
        if (commandCount <= i || i < 0) { // don't allow redoing this redo
            throw new MalformedCommandException("Command index out of range");
        }
        cl.parseCommand(history.get(i));
        if (cl.getCommand().equals("redo")) {
            throw new MalformedCommandException("No redoing redos");
        }
        history.put(commandCount, history.get(i));
        processCmd(cl);
    } else if (cmd.equals("history")) {
        for (int i = commandCount - 10; i <= commandCount; ++i) {
            if (i < 0) {
                continue;
            }
            System.out.println(i + " - " + history.get(i));
        }
    } else if (cmd.equals("printwatches")) {
        if (args.length == 1) {
            System.out.println("printwatches is " + (printWatches ? "on" : "off"));
        } else {
            printWatches = args[1].equals("on");
        }
    } else if (cmd.equals("connect")) {
        if (args.length >= 2) {
            connectToZK(args[1]);
        } else {
            connectToZK(host);
        }
    }

    // Below commands all need a live connection
    if (zk == null || !zk.getState().isAlive()) {
        System.out.println("Not connected");
        return false;
    }

    // execute from commandMap
    CliCommand cliCmd = commandMapCli.get(cmd);
    if (cliCmd != null) {
        cliCmd.setZk(zk);
        watch = cliCmd.parse(args).exec();
    } else if (!commandMap.containsKey(cmd)) {
        usage();
    }
    return watch;
}