Skip to content

源码之ZK选举

1. ZK选举总流程

Alt text

2. ZK选举准备流程

Alt text

3. 开始选举源码

  1. 选举投票准备 QuorumPeer.java
java
 @Override
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    loadDataBase();
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
    }
    // 选举准备
    startLeaderElection();
    startJvmPauseMonitor();
    super.start();
}
  1. 创建选票
java
public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            // 1 选票组成:epoch(leader的任期代号)、zxid(某个leader当选期间执行的事务编号)、myid(serverid)
            // 2 开始选票时,都是先投自己
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    // 创建选举算法实例
    this.electionAlg = createElectionAlgorithm(electionType);
}
  1. 开始进行投票
java
@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    // 1 创建 QuorumCnxnManager,负责选举过程中的所有网络通信
    case 3:
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if (oldQcm != null) {
            LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
            oldQcm.halt();
        }
        QuorumCnxManager.Listener listener = qcm.listener;
        // 2 启动监听线程
        if (listener != null) {
            listener.start();
            // 3 准备开始选举
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}
  1. 网络通信组件初始化
java
public QuorumCnxManager createCnxnManager() {
    int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
    LOG.info("Using {}ms as the quorum cnxn socket timeout", timeout);
    return new QuorumCnxManager(
        this,
        this.getMyId(),
        this.getView(),
        this.authServer,
        this.authLearner,
        timeout,
        this.getQuorumListenOnAllIPs(),
        this.quorumCnxnThreadsSize,
        this.isQuorumSaslAuthEnabled());
}

public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view,
    QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs,
    int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
    // 创建各种队列,如上图中的:recvQueue、queueSendMap、senderWorkerMap  
    this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
    this.queueSendMap = new ConcurrentHashMap<>();
    this.senderWorkerMap = new ConcurrentHashMap<>();
    this.lastMessageSent = new ConcurrentHashMap<>();

    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if (cnxToValue != null) {
        this.cnxTO = Integer.parseInt(cnxToValue);
    }

    this.self = self;

    this.mySid = mySid;
    this.socketTimeout = socketTimeout;
    this.view = view;
    this.listenOnAllIPs = listenOnAllIPs;
    this.authServer = authServer;
    this.authLearner = authLearner;
    this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;

    initializeConnectionExecutor(mySid, quorumCnxnThreadsSize);

    // Starts listener thread that waits for connection requests
    listener = new Listener();
    listener.setName("QuorumPeerListener");
}
  1. 监听线程初始化 点击QuorumCnxManager.Listener,可以看到是一个继承线程的类,找到对应的run方法
java
@Override
public void run() {
    if (!shutdown) {
        ......
        CountDownLatch latch = new CountDownLatch(addresses.size());
        // ListenerHandler中实现监听选举的端口
        listenerHandlers = addresses.stream().map(address ->
                        new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch))
                .collect(Collectors.toList());
        final ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
        try {
            listenerHandlers.forEach(executor::submit);
        } finally {
            // prevent executor's threads to leak after ListenerHandler tasks complete
            executor.shutdown();
        }
        ......
    }
    ......
}
  1. ListenerHandler也是线程类,查看run()方法
java
@Override
public void run() {
    try {
        Thread.currentThread().setName("ListenerHandler-" + address);
        acceptConnections();
        try {
            close();
        } catch (IOException e) {
            LOG.warn("Exception when shutting down listener: ", e);
        }
    } catch (Exception e) {
        // Output of unexpected exception, should never happen
        LOG.error("Unexpected error ", e);
    } finally {
        latch.countDown();
    }
}
  1. 处理监听
java
private void acceptConnections() {
    int numRetries = 0;
    Socket client = null;

    while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
        try {
            // 绑定服务器地址
            serverSocket = createNewServerSocket();
            LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, address.toString());
            // 只要不关机,一直监听
            while (!shutdown) {
                try {
                    // 阻塞,等待处理请求
                    client = serverSocket.accept();
                    setSockOpts(client);
                    LOG.info("Received connection request from {}", client.getRemoteSocketAddress());
                    // Receive and handle the connection request
                    // asynchronously if the quorum sasl authentication is
                    // enabled. This is required because sasl server
                    // authentication process may take few seconds to finish,
                    // this may delay next peer connection requests.
                    if (quorumSaslAuthEnabled) {
                        receiveConnectionAsync(client);
                    } else {
                        receiveConnection(client);
                    }
                    numRetries = 0;
                } catch (SocketTimeoutException e) {
                    LOG.warn("The socket is listening for the election accepted "
                            + "and it timed out unexpectedly, but will retry."
                            + "see ZOOKEEPER-2836");
                }
            }
        } catch (IOException e) {
            ......
        }
    }
    ......
}
  1. FastLeaderElection类进行初始化
java
protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    case 3:
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if (oldQcm != null) {
            LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
            oldQcm.halt();
        }
        QuorumCnxManager.Listener listener = qcm.listener;
        if (listener != null) {
            listener.start();
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}
  1. FastLeaderElection初始化,准备sendqueue,recvqueue
java
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}

private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    sendqueue = new LinkedBlockingQueue<ToSend>();
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);
}

4. 选举执行中

Alt text

  1. 选举入口
    QuorumPeer.java的start()方法
java
 @Override
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    // 冷启动数据恢复
    loadDataBase();
    // 启动通信工厂实例对象
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
    }
    // 准备选举环境
    startLeaderElection();
    startJvmPauseMonitor();
    // 执行选举
    super.start();
}
  1. 执行super.start()就相当于执行QuorumPeer.java类中的run()方法
    当Zookeeper启动后,首先都是Looking状态,通过选举,让其中一台服务器成为Leader,其他的服务器成为 Follower。
    QuorumPeer.java
java
@Override
public void run() {
    updateThreadName();

    LOG.debug("Starting quorum peer");
    try {
        jmxQuorumBean = new QuorumBean(this);
        MBeanRegistry.getInstance().register(jmxQuorumBean, null);
        for (QuorumServer s : getView().values()) {
            ZKMBeanInfo p;
            if (getMyId() == s.id) {
                p = jmxLocalPeerBean = new LocalPeerBean(this);
                try {
                    MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                    jmxLocalPeerBean = null;
                }
            } else {
                RemotePeerBean rBean = new RemotePeerBean(this, s);
                try {
                    MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                    jmxRemotePeerBean.put(s.id, rBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                }
            }
        }
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        jmxQuorumBean = null;
    }

    try {
        while (running) {
            if (unavailableStartTime == 0) {
                unavailableStartTime = Time.currentElapsedTime();
            }

            switch (getPeerState()) {
            case LOOKING:
                LOG.info("LOOKING");
                ServerMetrics.getMetrics().LOOKING_COUNT.add(1);

                if (Boolean.getBoolean("readonlymode.enabled")) {
                    LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                    // Create read-only server but don't start it immediately
                    final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

                    // Instead of starting roZk immediately, wait some grace
                    // period before we decide we're partitioned.
                    //
                    // Thread is used here because otherwise it would require
                    // changes in each of election strategy classes which is
                    // unnecessary code coupling.
                    Thread roZkMgr = new Thread() {
                        public void run() {
                            try {
                                // lower-bound grace period to 2 secs
                                sleep(Math.max(2000, tickTime));
                                if (ServerState.LOOKING.equals(getPeerState())) {
                                    roZk.startup();
                                }
                            } catch (InterruptedException e) {
                                LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                            } catch (Exception e) {
                                LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                            }
                        }
                    };
                    try {
                        roZkMgr.start();
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        // 进行选举,选举结束,返回最终成为Leader胜选的那张选票
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    } finally {
                        // If the thread is in the the grace period, interrupt
                        // to come out of waiting.
                        roZkMgr.interrupt();
                        roZk.shutdown();
                    }
                } else {
                    try {
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:
                try {
                    LOG.info("OBSERVING");
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                } finally {
                    observer.shutdown();
                    setObserver(null);
                    updateServerState();

                    // Add delay jitter before we switch to LOOKING
                    // state to reduce the load of ObserverMaster
                    if (isRunning()) {
                        Observer.waitForObserverElectionDelay();
                    }
                }
                break;
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    updateServerState();
                }
                break;
            case LEADING:
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    updateServerState();
                }
                break;
            }
        }
    } finally {
        LOG.warn("QuorumPeer main thread exited");
        MBeanRegistry instance = MBeanRegistry.getInstance();
        instance.unregister(jmxQuorumBean);
        instance.unregister(jmxLocalPeerBean);

        for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
            instance.unregister(remotePeerBean);
        }

        jmxQuorumBean = null;
        jmxLocalPeerBean = null;
        jmxRemotePeerBean = null;
    }
}
  1. lookForLeader(),顾名词义找老大的方法, 直到节点的状态发生变化(从Looking到FOLLOWING或者LEADING) FastLeaderElection.java
java
public Vote lookForLeader() throws InterruptedException {
    ......

    self.start_fle = Time.currentElapsedTime();
    try {
        // 正常启动中,所有其他服务器,都会给我发送一个投票,并且保存每一个服务器的最新合法有效的投票
        Map<Long, Vote> recvset = new HashMap<Long, Vote>();
        // 存储合法选举之外的投票结果
        Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
        // 一次选举的最大等待时间,默认值是 0.2s
        int notTimeout = minNotificationInterval;
        // 每发起一轮选举,logicalclock++
        // 在没有合法的 epoch 数据之前,都使用逻辑时钟代替
        // 选举leader的规则:依次比较epoch(任期) zxid(事务 id)serverid(myid) 谁大谁当选 leader
        synchronized (this) {
            // 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟
            logicalclock.incrementAndGet();
            // 更新选票(serverid, zxid, epoch)
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        // 广播选票,把自己的选票发给其他服务器
        sendNotifications();

        SyncedLearnerTracker voteSet = null;
        // 一轮一轮的选举直到选举成功
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            /*
                * Remove next notification from queue, times out after 2 times
                * the termination time
                */
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            /*
                * Sends more notifications if haven't received enough.
                * Otherwise processes new notification.
                */
            if (n == null) {
                if (manager.haveDelivered()) {
                    sendNotifications();
                } else {
                    manager.connectAll();
                }

                notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);

                if (self.getQuorumVerifier() instanceof QuorumOracleMaj
                        && self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)) {
                    setPeerState(proposedLeader, voteSet);
                    Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                    leaveInstance(endVote);
                    return endVote;
                }

                LOG.info("Notification time out: {} ms", notTimeout);

            } else if (validVoter(n.sid) && validVoter(n.leader)) {
                /*
                    * Only proceed if the vote comes from a replica in the current or next
                    * voting view for a replica in the current or next voting view.
                    */
                switch (n.state) {
                case LOOKING:
                    if (getInitLastLoggedZxid() == -1) {
                        LOG.debug("Ignoring notification as our zxid is -1");
                        break;
                    }
                    if (n.zxid == -1) {
                        LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                        break;
                    }
                    // If notification > current, replace and send messages out
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) {
                            LOG.debug(
                                "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                Long.toHexString(n.electionEpoch),
                                Long.toHexString(logicalclock.get()));
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    LOG.debug(
                        "Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
                        n.sid,
                        n.leader,
                        Long.toHexString(n.zxid),
                        Long.toHexString(n.electionEpoch));

                    // don't care about the version if it's in LOOKING state
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                    if (voteSet.hasAllQuorums()) {

                        // Verify if there is any change in the proposed leader
                        while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        if (n == null) {
                            setPeerState(proposedLeader, voteSet);
                            Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    LOG.debug("Notification from observer: {}", n.sid);
                    break;

                case FOLLOWING:
                    /*
                    * To avoid duplicate codes
                    * */
                    Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
                    if (resultFN == null) {
                        break;
                    } else {
                        return resultFN;
                    }
                case LEADING:
                    /*
                    * In leadingBehavior(), it performs followingBehvior() first. When followingBehavior() returns
                    * a null pointer, ask Oracle whether to follow this leader.
                    * */
                    Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
                    if (resultLN == null) {
                        break;
                    } else {
                        return resultLN;
                    }
                default:
                    LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
                    break;
                }
            } else {
                if (!validVoter(n.leader)) {
                    LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                }
                if (!validVoter(n.sid)) {
                    LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                }
            }
        }
        return null;
    } finally {
        try {
            if (self.jmxLeaderElectionBean != null) {
                MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;
        LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
    }
}
  1. 广播选票,把自己的选票发给其他服务器
java
private void sendNotifications() {
    // 遍历投票参与者,给每台服务器发送选票
    for (long sid : self.getCurrentAndNextConfigVoters()) {
        // 创建发送选票
        QuorumVerifier qv = self.getQuorumVerifier();
        ToSend notmsg = new ToSend(
            ToSend.mType.notification,
            proposedLeader,
            proposedZxid,
            logicalclock.get(),
            QuorumPeer.ServerState.LOOKING,
            sid,
            proposedEpoch,
            qv.toString().getBytes(UTF_8));

        LOG.debug(
            "Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
                + " {} (myid), 0x{} (n.peerEpoch) ",
            proposedLeader,
            Long.toHexString(proposedZxid),
            Long.toHexString(logicalclock.get()),
            sid,
            self.getMyId(),
            Long.toHexString(proposedEpoch));
        // 把发送选票放入发送队列
        sendqueue.offer(notmsg);
    }
}
  1. WorkerSender线程一直等待选票
    FastLeaderElection.java
java
class WorkerSender extends ZooKeeperThread {
    volatile boolean stop;
    QuorumCnxManager manager;

    WorkerSender(QuorumCnxManager manager) {
        super("WorkerSender");
        this.stop = false;
        this.manager = manager;
    }

    public void run() {
        // 只要启动就一直处于运行中
        while (!stop) {
            try {
                // 队列阻塞,时刻准备接收要发送的选票
                ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                if (m == null) {
                    continue;
                }
                // 处理要发送的选票
                process(m);
            } catch (InterruptedException e) {
                break;
            }
        }
        LOG.info("WorkerSender is down");
    }

    /**
     * Called by run() once there is a new message to send.
     */
    void process(ToSend m) {
        ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
        // 发送选票
        manager.toSend(m.sid, requestBuffer);
    }
}
  1. 将要发送出去的消息交给发送队列处理,实现和具体发送逻辑解耦
java
public void toSend(Long sid, ByteBuffer b) {
   // 判断如果是发给自己的消息,直接返回自己的RecvQueue,不发送出去
    if (this.mySid == sid) {
        b.position(0);
        addToRecvQueue(new Message(b.duplicate(), sid));
        /*
            * Otherwise send to the corresponding thread to send.
            */
    } else {
        // 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队列并把要发送的消息放入该队列
        BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
        addToSendQueue(bq, b);
        // 建立网络连接,sid就是其他服务器的myid
        connectOne(sid);
    }
}
  1. 如果数据是发送给自己的,添加到自己的接收队列
java
 public void addToRecvQueue(final Message msg) {
    final boolean success = this.recvQueue.offer(msg);
    if (!success) {
        throw new RuntimeException("Could not insert into receive queue");
    }
}
  1. 数据添加到发送队列
java
private void addToSendQueue(final BlockingQueue<ByteBuffer> queue, final ByteBuffer buffer) {
    // 将要发送的消息添加到发送队列    
    final boolean success = queue.offer(buffer);
    if (!success) {
        throw new RuntimeException("Could not insert into receive queue");
    }
}
  1. 与要发送的服务器节点建立通信连接
java
synchronized void connectOne(long sid) {
    if (senderWorkerMap.get(sid) != null) {
        LOG.debug("There is a connection already for server {}", sid);
        if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {
            // since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
            // one we are using is already dead and we need to clean-up, so when we will create a new connection
            // then we will choose an other one, which is actually reachable
            senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
        }
        return;
    }
    synchronized (self.QV_LOCK) {
        boolean knownId = false;
        // Resolve hostname for the remote server before attempting to
        // connect in case the underlying ip address has changed.
        self.recreateSocketAddresses(sid);
        Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
        QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
        Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
        if (lastCommittedView.containsKey(sid)) {
            knownId = true;
            LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getMyId(), sid);
            if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
                return;
            }
        }
        if (lastSeenQV != null
            && lastProposedView.containsKey(sid)
            && (!knownId
                || !lastProposedView.get(sid).electionAddr.equals(lastCommittedView.get(sid).electionAddr))) {
            knownId = true;
            LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getMyId(), sid);

            if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
                return;
            }
        }
        if (!knownId) {
            LOG.warn("Invalid server id: {} ", sid);
        }
    }
}
  1. 实际连接远程的zk节点进行通信connectOne()方法
java
synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
    // 如果之前连接过,直接连接
    if (senderWorkerMap.get(sid) != null) {
        LOG.debug("There is a connection already for server {}", sid);
        if (self.isMultiAddressEnabled() && electionAddr.size() > 1 && self.isMultiAddressReachabilityCheckEnabled()) {
            // since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
            // one we are using is already dead and we need to clean-up, so when we will create a new connection
            // then we will choose an other one, which is actually reachable
            senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
        }
        return true;
    }

    // we are doing connection initiation always asynchronously, since it is possible that
    // the socket connection timeouts or the SSL handshake takes too long and don't want
    // to keep the rest of the connections to wait
    // 第一次连接,进行初始化连接socket
    return initiateConnectionAsync(electionAddr, sid);
}
  1. 异步进行创建连接
java
public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) {
    if (!inprogressConnections.add(sid)) {
        // simply return as there is a connection request to
        // server 'sid' already in progress.
        LOG.debug("Connection request to server id: {} is already in progress, so skipping this request", sid);
        return true;
    }
    try {
        connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
        connectionThreadCnt.incrementAndGet();
    } catch (Throwable e) {
        // Imp: Safer side catching all type of exceptions and remove 'sid'
        // from inprogress connections. This is to avoid blocking further
        // connection requests from this 'sid' in case of errors.
        inprogressConnections.remove(sid);
        LOG.error("Exception while submitting quorum connection request", e);
        return false;
    }
    return true;
}
  1. QuorumConnectionReqThread的run()方法 QuorumConnectionReqThread.java
java
private class QuorumConnectionReqThread extends ZooKeeperThread {
    final MultipleAddresses electionAddr;
    final Long sid;
    QuorumConnectionReqThread(final MultipleAddresses electionAddr, final Long sid) {
        super("QuorumConnectionReqThread-" + sid);
        this.electionAddr = electionAddr;
        this.sid = sid;
    }

    @Override
    public void run() {
        try {
            initiateConnection(electionAddr, sid);
        } finally {
            inprogressConnections.remove(sid);
        }
    }
}
  1. 初始化socket连接
java
public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {
    Socket sock = null;
    try {
        LOG.debug("Opening channel to server {}", sid);
        if (self.isSslQuorum()) {
            sock = self.getX509Util().createSSLSocket();
        } else {
            sock = SOCKET_FACTORY.get();
        }
        setSockOpts(sock);
        sock.connect(electionAddr.getReachableOrOne(), cnxTO);
        if (sock instanceof SSLSocket) {
            SSLSocket sslSock = (SSLSocket) sock;
            sslSock.startHandshake();
            LOG.info("SSL handshake complete with {} - {} - {}",
                        sslSock.getRemoteSocketAddress(),
                        sslSock.getSession().getProtocol(),
                        sslSock.getSession().getCipherSuite());
        }

        LOG.debug("Connected to server {} using election address: {}:{}",
                    sid, sock.getInetAddress(), sock.getPort());
    } catch (X509Exception e) {
        LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e);
        closeSocket(sock);
        return;
    } catch (UnresolvedAddressException | IOException e) {
        LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e);
        closeSocket(sock);
        return;
    }
    try {
        startConnection(sock, sid);
    } catch (IOException e) {
        LOG.error(
            "Exception while connecting, id: {}, addr: {}, closing learner connection",
            sid,
            sock.getRemoteSocketAddress(),
            e);
        closeSocket(sock);
    }
}
  1. 创建并启动发送器线程和接收器线程
java
private boolean startConnection(Socket sock, Long sid) throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    LOG.debug("startConnection (myId:{} --> sid:{})", self.getMyId(), sid);
    try {
        // Use BufferedOutputStream to reduce the number of IP packets. This is
        // important for x-DC scenarios.
        // 通过输出流,向服务器发送数据
        BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
        dout = new DataOutputStream(buf);

        // Sending id and challenge

        // First sending the protocol version (in other words - message type).
        // For backward compatibility reasons we stick to the old protocol version, unless the MultiAddress
        // feature is enabled. During rolling upgrade, we must make sure that all the servers can
        // understand the protocol version we use to avoid multiple partitions. see ZOOKEEPER-3720
        long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
        dout.writeLong(protocolVersion);
        dout.writeLong(self.getMyId());

        // now we send our election address. For the new protocol version, we can send multiple addresses.
        Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
                ? self.getElectionAddress().getAllAddresses()
                : Arrays.asList(self.getElectionAddress().getOne());

        String addr = addressesToSend.stream()
                .map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
        byte[] addr_bytes = addr.getBytes();
        dout.writeInt(addr_bytes.length);
        dout.write(addr_bytes);
        dout.flush();
        // 通过输入流读取对方发送过来的选票
        din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
        LOG.warn("Ignoring exception reading or writing challenge: ", e);
        closeSocket(sock);
        return false;
    }

    // authenticate learner
    QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
    if (qps != null) {
        // TODO - investigate why reconfig makes qps null.
        authLearner.authenticate(sock, qps.hostname);
    }

    // If lost the challenge, then drop the new connection
    // 如果对方的 id 比我的大,我是没有资格给对方发送连接请求的,直接关闭自己的客户端
    if (sid > self.getMyId()) {
        LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.getMyId(), sid);
        closeSocket(sock);
        // Otherwise proceed with the connection
    } else {
        LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getMyId(), sid);
        // 初始化,发送器和接收器
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if (vsw != null) {
            vsw.finish();
        }

        senderWorkerMap.put(sid, sw);

        queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
        // 启动发送器线程和接收器线程
        sw.start();
        rw.start();

        return true;
    }
    return false;
}
  1. SendWorker类,用于发送消息,查看run()方法
java
@Override
public void run() {
    threadCnt.incrementAndGet();
    try {   
        /**
         * If there is nothing in the queue to send, then we
         * send the lastMessage to ensure that the last message
         * was received by the peer. The message could be dropped
         * in case self or the peer shutdown their connection
         * (and exit the thread) prior to reading/processing
         * the last message. Duplicate messages are handled correctly
         * by the peer.
         *
         * If the send queue is non-empty, then we have a recent
         * message than that stored in lastMessage. To avoid sending
         * stale message, we should send the message in the send queue.
         */
        BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
        if (bq == null || isSendQueueEmpty(bq)) {
            ByteBuffer b = lastMessageSent.get(sid);
            if (b != null) {
                LOG.debug("Attempting to send lastMessage to sid={}", sid);
                send(b);
            }
        }
    } catch (IOException e) {
        LOG.error("Failed to send last message. Shutting down thread.", e);
        this.finish();
    }
    LOG.debug("SendWorker thread started towards {}. myId: {}", sid, QuorumCnxManager.this.mySid);

    try {
        // 只要连接没有断开
        while (running && !shutdown && sock != null) {

            ByteBuffer b = null;
            try {
                BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                if (bq != null) {
                    // 不断从发送队列 SendQueue 中,获取发送消息,并执行发送
                    b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                } else {
                    LOG.error("No queue of incoming messages for server {}", sid);
                    break;
                }

                if (b != null) {
                    // 更新对于 sid 这台服务器的最近一条消息
                    lastMessageSent.put(sid, b);
                    // 执行发送
                    send(b);
                }
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for message on queue", e);
            }
        }
    } catch (Exception e) {
        LOG.warn(
            "Exception when using channel: for id {} my id = {}",
            sid ,
            QuorumCnxManager.this.mySid,
            e);
    }
    this.finish();

    LOG.warn("Send worker leaving thread id {} my id = {}", sid, self.getMyId());
}
  1. 发送字节流 SendWorker.java
java
synchronized void send(ByteBuffer b) throws IOException {
    byte[] msgBytes = new byte[b.capacity()];
    try {
        b.position(0);
        b.get(msgBytes);
    } catch (BufferUnderflowException be) {
        LOG.error("BufferUnderflowException ", be);
        return;
    }
    // 输出流向外发送
    dout.writeInt(b.capacity());
    dout.write(b.array());
    dout.flush();
}
  1. RecvWorker类的run()方法, 接收数据流
    RecvWorker.java
java
@Override
public void run() {
    threadCnt.incrementAndGet();
    try {
        LOG.debug("RecvWorker thread towards {} started. myId: {}", sid, QuorumCnxManager.this.mySid);
        // 只要连接没有断开
        while (running && !shutdown && sock != null) {
            /**
             * Reads the first int to determine the length of the message
             */
            int length = din.readInt();
            if (length <= 0 || length > PACKETMAXSIZE) {
                throw new IOException("Received packet with invalid packet: " + length);
            }
            /**
             * Allocates a new ByteBuffer to receive the message
             */
            final byte[] msgArray = new byte[length];
            // 输入流接收消息
            din.readFully(msgArray, 0, length);
            // 接收对方发送过来的选票
            addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
        }
    } catch (Exception e) {
        LOG.warn(
            "Connection broken for id {}, my id = {}",
            sid,
            QuorumCnxManager.this.mySid,
            e);
    } finally {
        LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
        sw.finish();
        closeSocket(sock);
    }
}
  1. 将接收到的消息,放入接收消息队列
java
public void addToRecvQueue(final Message msg) {
    final boolean success = this.recvQueue.offer(msg);
    if (!success) {
        throw new RuntimeException("Could not insert into receive queue");
    }
}
  1. WorkerReceiver线程
java
public void run() {

Message response;
while (!stop) {
    // Sleeps on receive
    try {
        // 从 RecvQueue 中取出选举投票消息(其他服务器发送过来的)
        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
        if (response == null) {
            continue;
        }

        final int capacity = response.buffer.capacity();

        // The current protocol and two previous generations all send at least 28 bytes
        if (capacity < 28) {
            LOG.error("Got a short response from server {}: {}", response.sid, capacity);
            continue;
        }

        // this is the backwardCompatibility mode in place before ZK-107
        // It is for a version of the protocol in which we didn't send peer epoch
        // With peer epoch and version the message became 40 bytes
        boolean backCompatibility28 = (capacity == 28);

        // this is the backwardCompatibility mode for no version information
        boolean backCompatibility40 = (capacity == 40);

        response.buffer.clear();

        // Instantiate Notification and set its attributes
        Notification n = new Notification();

        int rstate = response.buffer.getInt();
        long rleader = response.buffer.getLong();
        long rzxid = response.buffer.getLong();
        long relectionEpoch = response.buffer.getLong();
        long rpeerepoch;

        int version = 0x0;
        QuorumVerifier rqv = null;

        try {
            if (!backCompatibility28) {
                rpeerepoch = response.buffer.getLong();
                if (!backCompatibility40) {
                    /*
                        * Version added in 3.4.6
                        */

                    version = response.buffer.getInt();
                } else {
                    LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);
                }
            } else {
                LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);
                rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
            }

            // check if we have a version that includes config. If so extract config info from message.
            if (version > 0x1) {
                int configLength = response.buffer.getInt();

                // we want to avoid errors caused by the allocation of a byte array with negative length
                // (causing NegativeArraySizeException) or huge length (causing e.g. OutOfMemoryError)
                if (configLength < 0 || configLength > capacity) {
                    throw new IOException(String.format("Invalid configLength in notification message! sid=%d, capacity=%d, version=%d, configLength=%d",
                                                        response.sid, capacity, version, configLength));
                }

                byte[] b = new byte[configLength];
                response.buffer.get(b);

                synchronized (self) {
                    try {
                        rqv = self.configFromString(new String(b, UTF_8));
                        QuorumVerifier curQV = self.getQuorumVerifier();
                        if (rqv.getVersion() > curQV.getVersion()) {
                            LOG.info("{} Received version: {} my version: {}",
                                        self.getMyId(),
                                        Long.toHexString(rqv.getVersion()),
                                        Long.toHexString(self.getQuorumVerifier().getVersion()));
                            if (self.getPeerState() == ServerState.LOOKING) {
                                LOG.debug("Invoking processReconfig(), state: {}", self.getServerState());
                                self.processReconfig(rqv, null, null, false);
                                if (!rqv.equals(curQV)) {
                                    LOG.info("restarting leader election");
                                    self.shuttingDownLE = true;
                                    self.getElectionAlg().shutdown();

                                    break;
                                }
                            } else {
                                LOG.debug("Skip processReconfig(), state: {}", self.getServerState());
                            }
                        }
                    } catch (IOException | ConfigException e) {
                        LOG.error("Something went wrong while processing config received from {}", response.sid);
                    }
                }
            } else {
                LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);
            }
        } catch (BufferUnderflowException | IOException e) {
            LOG.warn("Skipping the processing of a partial / malformed response message sent by sid={} (message length: {})",
                        response.sid, capacity, e);
            continue;
        }
        /*
            * If it is from a non-voting server (such as an observer or
            * a non-voting follower), respond right away.
            */
        if (!validVoter(response.sid)) {
            Vote current = self.getCurrentVote();
            QuorumVerifier qv = self.getQuorumVerifier();
            ToSend notmsg = new ToSend(
                ToSend.mType.notification,
                current.getId(),
                current.getZxid(),
                logicalclock.get(),
                self.getPeerState(),
                response.sid,
                current.getPeerEpoch(),
                qv.toString().getBytes(UTF_8));

            sendqueue.offer(notmsg);
        } else {
            // Receive new message
            LOG.debug("Receive new notification message. My id = {}", self.getMyId());

            // State of peer that sent this message
            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
            switch (rstate) {
            case 0:
                ackstate = QuorumPeer.ServerState.LOOKING;
                break;
            case 1:
                ackstate = QuorumPeer.ServerState.FOLLOWING;
                break;
            case 2:
                ackstate = QuorumPeer.ServerState.LEADING;
                break;
            case 3:
                ackstate = QuorumPeer.ServerState.OBSERVING;
                break;
            default:
                continue;
            }

            n.leader = rleader;
            n.zxid = rzxid;
            n.electionEpoch = relectionEpoch;
            n.state = ackstate;
            n.sid = response.sid;
            n.peerEpoch = rpeerepoch;
            n.version = version;
            n.qv = rqv;
            /*
                * Print notification info
                */
            LOG.info(
                "Notification: my state:{}; n.sid:{}, n.state:{}, n.leader:{}, n.round:0x{}, "
                    + "n.peerEpoch:0x{}, n.zxid:0x{}, message format version:0x{}, n.config version:0x{}",
                self.getPeerState(),
                n.sid,
                n.state,
                n.leader,
                Long.toHexString(n.electionEpoch),
                Long.toHexString(n.peerEpoch),
                Long.toHexString(n.zxid),
                Long.toHexString(n.version),
                (n.qv != null ? (Long.toHexString(n.qv.getVersion())) : "0"));

            /*
                * If this server is looking, then send proposed leader
                */

            if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                recvqueue.offer(n);

                /*
                    * Send a notification back if the peer that sent this
                    * message is also looking and its logical clock is
                    * lagging behind.
                    */
                if ((ackstate == QuorumPeer.ServerState.LOOKING)
                    && (n.electionEpoch < logicalclock.get())) {
                    Vote v = getVote();
                    QuorumVerifier qv = self.getQuorumVerifier();
                    ToSend notmsg = new ToSend(
                        ToSend.mType.notification,
                        v.getId(),
                        v.getZxid(),
                        logicalclock.get(),
                        self.getPeerState(),
                        response.sid,
                        v.getPeerEpoch(),
                        qv.toString().getBytes());
                    sendqueue.offer(notmsg);
                }
            } else {
                /*
                    * If this server is not looking, but the one that sent the ack
                    * is looking, then send back what it believes to be the leader.
                    */
                Vote current = self.getCurrentVote();
                if (ackstate == QuorumPeer.ServerState.LOOKING) {
                    if (self.leader != null) {
                        if (leadingVoteSet != null) {
                            self.leader.setLeadingVoteSet(leadingVoteSet);
                            leadingVoteSet = null;
                        }
                        self.leader.reportLookingSid(response.sid);
                    }


                    LOG.debug(
                        "Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
                        self.getMyId(),
                        response.sid,
                        Long.toHexString(current.getZxid()),
                        current.getId(),
                        Long.toHexString(self.getQuorumVerifier().getVersion()));

                    QuorumVerifier qv = self.getQuorumVerifier();
                    ToSend notmsg = new ToSend(
                        ToSend.mType.notification,
                        current.getId(),
                        current.getZxid(),
                        current.getElectionEpoch(),
                        self.getPeerState(),
                        response.sid,
                        current.getPeerEpoch(),
                        qv.toString().getBytes());
                    sendqueue.offer(notmsg);
                }
            }
        }
    } catch (InterruptedException e) {
        LOG.warn("Interrupted Exception while waiting for new message", e);
    }
}