Skip to content

源码之Follower和Leader状态同步

当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的Leader更新自己状态为Leader,其他节点更新自己状态为Follower。

1. Follower和Leader状态同步流程

Alt text

总结同步的策略方式

  1. DIFF: 咱两一样,不需要做什么。
  2. TRUNC: follower的zxid 比leader的zxid大,所以Follower要回滚。
  3. COMMIT: leader的zxid比follower的zxid大,发送Proposal给foloower提交执行。
  4. SNAP: 如果follower并没有任何数据,直接使用SNAP的方式来执行数据同步(直接把数据全部序列到 follower)。

2. Follower和Leader状态同步源码

Leader更新状态入口:leader.lead()
Follower更新状态入口:follower.followerLeader()

  1. follower必须要让leader知道自己的状态:epoch、zxid、sid
  • 必须要找出谁是leader;
  • 发起请求连接leader;
  • 发送自己的信息给leader;
  1. leader接收到信息,必须要返回对应的信息给follower。
  2. 当leader得知follower的状态了,就确定需要做何种方式的数据同步DIFF、TRUNC、SNAP执行数据同步。
  3. 当leader接收到超过半数follower的ack之后,进入正常工作状态,集群启动完成了。

下图为具体源码中核心状态同步代码逻辑:
Alt text

3. Leader等待接收follower的状态同步申请

  1. 在Leader.java中查找lead()方法
java
void lead() throws IOException, InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
    LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
    self.start_fle = 0;
    self.end_fle = 0;

    zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        self.tick.set(0);
        // 恢复数据到内存,启动时,其实已经加载过了
        zk.loadData();

        leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

        // Start thread that waits for connection requests from
        // new followers.
        // 等待其他 follower 节点向 leader 节点发送同步状态请求
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();

        long epoch = getEpochToPropose(self.getMyId(), self.getAcceptedEpoch());

        zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

        ......
    }
}
  1. 使用多线程异步监听 LearnerCnxAcceptor.java
java
@Override
public void run() {
    if (!stop.get() && !serverSockets.isEmpty()) {
        ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size());
        CountDownLatch latch = new CountDownLatch(serverSockets.size());

        serverSockets.forEach(serverSocket ->
                executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch)));

        try {
            latch.await();
        } catch (InterruptedException ie) {
            LOG.error("Interrupted while sleeping in LearnerCnxAcceptor.", ie);
        } finally {
            closeSockets();
            executor.shutdown();
            try {
                if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                    LOG.error("not all the LearnerCnxAcceptorHandler terminated properly");
                }
            } catch (InterruptedException ie) {
                LOG.error("Interrupted while terminating LearnerCnxAcceptor.", ie);
            }
        }
    }
}
  1. LearnerCnxAcceptorHandler是一个线程类,它的run()方法
java
@Override
public void run() {
    try {
        Thread.currentThread().setName("LearnerCnxAcceptorHandler-" + serverSocket.getLocalSocketAddress());
        // 等待接收 follower 的状态同步申请
        while (!stop.get()) {
            acceptConnections();
        }
    } catch (Exception e) {
        LOG.warn("Exception while accepting follower", e);
        if (fail.compareAndSet(false, true)) {
            handleException(getName(), e);
            halt();
        }
    } finally {
        latch.countDown();
    }
}

private void acceptConnections() throws IOException {
    Socket socket = null;
    boolean error = false;
    try {
        socket = serverSocket.accept();

        // start with the initLimit, once the ack is processed
        // in LearnerHandler switch to the syncLimit
        socket.setSoTimeout(self.tickTime * self.initLimit);
        socket.setTcpNoDelay(nodelay);

        BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
        // 一旦接收到 follower 的请求,就创建 LearnerHandler 对象,处理请求
        LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);
        // 启动线程
        fh.start();
    } catch (SocketException e) {
        .......
    }
}

4. Follower查找并连接Leader

  1. followLeader()方法
java
void followLeader() throws InterruptedException {
    ......

    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        // 查找 leader
        QuorumServer leaderServer = findLeader();
        try {
            // 连接 leader
            connectToLeader(leaderServer.addr, leaderServer.hostname);
            connectionTime = System.currentTimeMillis();
            // 向 leader 注册
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
            if (self.isReconfigStateChange()) {
                throw new Exception("learned about role change");
            }
            //check to see if the leader zxid is lower than ours
            //this should never happen but is just a safety check
            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                LOG.error("Proposed leader epoch "
                            + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch "
                            + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                throw new IOException("Error: Epoch of leader is lower");
            }
            long startTime = Time.currentElapsedTime();
            self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
            self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
            syncWithLeader(newEpochZxid);
            self.setZabState(QuorumPeer.ZabState.BROADCAST);
            completedSync = true;
            long syncTime = Time.currentElapsedTime() - startTime;
            ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
            if (self.getObserverMasterPort() > 0) {
                LOG.info("Starting ObserverMaster");

                om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
                om.start();
            } else {
                om = null;
            }
            // create a reusable packet to reduce gc impact
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning()) {
                readPacket(qp);
                processPacket(qp);
            }
        } catch (Exception e) {
            LOG.warn("Exception when following the leader", e);
            closeSocket();

            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        ......
    }
}
  1. 查找Leader的findLeader()方法
    Learner.java
java
protected QuorumServer findLeader() {
    QuorumServer leaderServer = null;
    // Find the leader by id
    // 选举投票的时候记录的,最后推荐的 leader 的 sid
    Vote current = self.getCurrentVote();
    // 如果这个 sid 在启动的所有服务器范围中
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            // Ensure we have the leader's correct IP address before
            // attempting to connect.
            // 尝试连接 leader 的正确 IP 地址
            s.recreateSocketAddresses();
            leaderServer = s;
            break;
        }
    }
    if (leaderServer == null) {
        LOG.warn("Couldn't find the leader with id = {}", current.getId());
    }
    return leaderServer;
}
  1. 连接Leader
    Learner.java
java
protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {

    this.leaderAddr = multiAddr;
    Set<InetSocketAddress> addresses;
    if (self.isMultiAddressReachabilityCheckEnabled()) {
        // even if none of the addresses are reachable, we want to try to establish connection
        // see ZOOKEEPER-3758
        addresses = multiAddr.getAllReachableAddressesOrAll();
    } else {
        addresses = multiAddr.getAllAddresses();
    }
    ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
    CountDownLatch latch = new CountDownLatch(addresses.size());
    AtomicReference<Socket> socket = new AtomicReference<>(null);
    addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);

    try {
        latch.await();
    } catch (InterruptedException e) {
        LOG.warn("Interrupted while trying to connect to Leader", e);
    } finally {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                LOG.error("not all the LeaderConnector terminated properly");
            }
        } catch (InterruptedException ie) {
            LOG.error("Interrupted while terminating LeaderConnector executor.", ie);
        }
    }

    if (socket.get() == null) {
        throw new IOException("Failed connect to " + multiAddr);
    } else {
        sock = socket.get();
        sockBeingClosed.set(false);
    }

    self.authLearner.authenticate(sock, hostname);

    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
    if (asyncSending) {
        startSendingThread();
    }
}
  1. 底层使用多线程,由LeaderConnector类的run()方法实现连接
java
@Override
public void run() {
    try {
        Thread.currentThread().setName("LeaderConnector-" + address);
        // 具体逻辑远程连接leader
        Socket sock = connectToLeader();

        if (sock != null && sock.isConnected()) {
            if (socket.compareAndSet(null, sock)) {
                LOG.info("Successfully connected to leader, using address: {}", address);
            } else {
                LOG.info("Connection to the leader is already established, close the redundant connection");
                sock.close();
            }
        }

    } catch (Exception e) {
        LOG.error("Failed connect to {}", address, e);
    } finally {
        latch.countDown();
    }
}

private Socket connectToLeader() throws IOException, X509Exception, InterruptedException {
    Socket sock = createSocket();

    // leader connection timeout defaults to tickTime * initLimit
    int connectTimeout = self.tickTime * self.initLimit;

    // but if connectToLearnerMasterLimit is specified, use that value to calculate
    // timeout instead of using the initLimit value
    if (self.connectToLearnerMasterLimit > 0) {
        connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
    }

    int remainingTimeout;
    long startNanoTime = nanoTime();
    // 尝试五次连接Leader
    for (int tries = 0; tries < 5 && socket.get() == null; tries++) {
        try {
            // recalculate the init limit time because retries sleep for 1000 milliseconds
            remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000);
            if (remainingTimeout <= 0) {
                LOG.error("connectToLeader exceeded on retries.");
                throw new IOException("connectToLeader exceeded on retries.");
            }
            // 底层socket通信
            sockConnect(sock, address, Math.min(connectTimeout, remainingTimeout));
            if (self.isSslQuorum()) {
                ((SSLSocket) sock).startHandshake();
            }
            sock.setTcpNoDelay(nodelay);
            break;
        } catch (IOException e) {
            ......
        }
        Thread.sleep(leaderConnectDelayDuringRetryMs);
    }

    return sock;
}

5. 创建LearnerHandler,处理连接请求

  1. 回到Leader源码中,等待连接后面,会进行请求数据处理 LearnerCnxAcceptorHandler.java
java
private void acceptConnections() throws IOException {
    Socket socket = null;
    boolean error = false;
    try {
        socket = serverSocket.accept();

        // start with the initLimit, once the ack is processed
        // in LearnerHandler switch to the syncLimit
        socket.setSoTimeout(self.tickTime * self.initLimit);
        socket.setTcpNoDelay(nodelay);

        BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
        LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);
        fh.start();
    } catch (SocketException e) {
        error = true;
        if (stop.get()) {
            LOG.warn("Exception while shutting down acceptor.", e);
        } else {
            throw e;
        }
    } catch (SaslException e) {
        LOG.error("Exception while connecting to quorum learner", e);
        error = true;
    } catch (Exception e) {
        error = true;
        throw e;
    } finally {
        // Don't leak sockets on errors
        if (error && socket != null && !socket.isClosed()) {
            try {
                socket.close();
            } catch (IOException e) {
                LOG.warn("Error closing socket: " + socket, e);
            }
        }
    }
}
  1. LearnerHandler类是线程类,查看run()方法
java
@Override
public void run() {
    try {
        learnerMaster.addLearnerHandler(this);
        // 获取心跳间隔时间
        tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline();

        ia = BinaryInputArchive.getArchive(bufferedInput);
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        oa = BinaryOutputArchive.getArchive(bufferedOutput);
        // 从网络中接收消息,并反序列化为 packet
        QuorumPacket qp = new QuorumPacket();
        ia.readRecord(qp, "packet");

        messageTracker.trackReceived(qp.getType());
        // 选举结束后,observer 和 follower 都应该给 leader 发送一个标志信息:
        // FOLLOWERINFO 或者 OBSERVERINFO
        if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
            LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString());

            return;
        }

        if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
            throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
        }
        byte[] learnerInfoData = qp.getData();
        if (learnerInfoData != null) {
            ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
            if (learnerInfoData.length >= 8) {
                this.sid = bbsid.getLong();
            }
            if (learnerInfoData.length >= 12) {
                this.version = bbsid.getInt(); // protocolVersion
            }
            if (learnerInfoData.length >= 20) {
                long configVersion = bbsid.getLong();
                if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
                    throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                }
            }
        } else {
            this.sid = learnerMaster.getAndDecrementFollowerCounter();
        }

        String followerInfo = learnerMaster.getPeerInfo(this.sid);
        if (followerInfo.isEmpty()) {
            LOG.info(
                "Follower sid: {} not in the current config {}",
                this.sid,
                Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
        } else {
            LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo);
        }

        if (qp.getType() == Leader.OBSERVERINFO) {
            learnerType = LearnerType.OBSERVER;
        }

        learnerMaster.registerLearnerHandlerBean(this, sock);
        // 读取 Follower 发送过来的 lastAcceptedEpoch
        // 选举过程中,所使用的 epoch,其实还是上一任 leader 的 epoch
        long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

        long peerLastZxid;
        StateSummary ss = null;
        // 读取 follower 发送过来的 zxid
        long zxid = qp.getZxid();
        // Leader 根据从 Follower 获取 sid 和旧的 epoch,构建新的 epoch
        long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
        long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            learnerMaster.waitForEpochAck(this.getSid(), ss);
        } else {
            byte[] ver = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
            // Leader 向 Follower 发送信息(包含:zxid 和 newEpoch)
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
            oa.writeRecord(newEpochPacket, "packet");
            messageTracker.trackSent(Leader.LEADERINFO);
            bufferedOutput.flush();
            QuorumPacket ackEpochPacket = new QuorumPacket();
            ia.readRecord(ackEpochPacket, "packet");
            messageTracker.trackReceived(ackEpochPacket.getType());
            if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
                return;
            }
            ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
            ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
            learnerMaster.waitForEpochAck(this.getSid(), ss);
        }
        peerLastZxid = ss.getLastZxid();

        // Take any necessary action if we need to send TRUNC or DIFF
        // startForwarding() will be called in all cases
        boolean needSnap = syncFollower(peerLastZxid, learnerMaster);

        // syncs between followers and the leader are exempt from throttling because it
        // is important to keep the state of quorum servers up-to-date. The exempted syncs
        // are counted as concurrent syncs though
        boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
        /* if we are not truncating or sending a diff just send a snapshot */
        if (needSnap) {
            syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
            syncThrottler.beginSync(exemptFromThrottle);
            ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
            try {
                long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                messageTracker.trackSent(Leader.SNAP);
                bufferedOutput.flush();

                LOG.info(
                    "Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
                        + "send zxid of db as 0x{}, {} concurrent snapshot sync, "
                        + "snapshot sync was {} from throttle",
                    Long.toHexString(peerLastZxid),
                    Long.toHexString(leaderLastZxid),
                    Long.toHexString(zxidToSend),
                    syncThrottler.getSyncInProgress(),
                    exemptFromThrottle ? "exempt" : "not exempt");
                // Dump data to peer
                learnerMaster.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
                bufferedOutput.flush();
            } finally {
                ServerMetrics.getMetrics().SNAP_COUNT.add(1);
            }
        } else {
            syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
            syncThrottler.beginSync(exemptFromThrottle);
            ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
            ServerMetrics.getMetrics().DIFF_COUNT.add(1);
        }

        LOG.debug("Sending NEWLEADER message to {}", sid);
        // the version of this quorumVerifier will be set by leader.lead() in case
        // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
        // we got here, so the version was set
        if (getVersion() < 0x10000) {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
            oa.writeRecord(newLeaderQP, "packet");
        } else {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
            queuedPackets.add(newLeaderQP);
        }
        bufferedOutput.flush();

        // Start thread that blast packets in the queue to learner
        startSendingPackets();

        /*
            * Have to wait for the first ACK, wait until
            * the learnerMaster is ready, and only then we can
            * start processing messages.
            */
        qp = new QuorumPacket();
        ia.readRecord(qp, "packet");

        messageTracker.trackReceived(qp.getType());
        if (qp.getType() != Leader.ACK) {
            LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
            return;
        }

        LOG.debug("Received NEWLEADER-ACK message from {}", sid);

        learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());

        syncLimitCheck.start();
        // sync ends when NEWLEADER-ACK is received
        syncThrottler.endSync();
        if (needSnap) {
            ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
        } else {
            ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
        }
        syncThrottler = null;

        // now that the ack has been processed expect the syncLimit
        sock.setSoTimeout(learnerMaster.syncTimeout());

        /*
            * Wait until learnerMaster starts up
            */
        learnerMaster.waitForStartup();

        // Mutation packets will be queued during the serialize,
        // so we need to mark when the peer can actually start
        // using the data
        //
        LOG.debug("Sending UPTODATE message to {}", sid);
        queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

        while (true) {
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            messageTracker.trackReceived(qp.getType());

            if (LOG.isTraceEnabled()) {
                long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
                if (qp.getType() == Leader.PING) {
                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
                }
                ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
            }
            tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline();

            packetsReceived.incrementAndGet();

            ByteBuffer bb;
            long sessionId;
            int cxid;
            int type;

            switch (qp.getType()) {
            case Leader.ACK:
                if (this.learnerType == LearnerType.OBSERVER) {
                    LOG.debug("Received ACK from Observer {}", this.sid);
                }
                syncLimitCheck.updateAck(qp.getZxid());
                learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                break;
            case Leader.PING:
                // Process the touches
                ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
                DataInputStream dis = new DataInputStream(bis);
                while (dis.available() > 0) {
                    long sess = dis.readLong();
                    int to = dis.readInt();
                    learnerMaster.touch(sess, to);
                }
                break;
            case Leader.REVALIDATE:
                ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
                learnerMaster.revalidateSession(qp, this);
                break;
            case Leader.REQUEST:
                bb = ByteBuffer.wrap(qp.getData());
                sessionId = bb.getLong();
                cxid = bb.getInt();
                type = bb.getInt();
                bb = bb.slice();
                Request si;
                if (type == OpCode.sync) {
                    si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                } else {
                    si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                }
                si.setOwner(this);
                learnerMaster.submitLearnerRequest(si);
                requestsReceived.incrementAndGet();
                break;
            default:
                LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
                break;
            }
        }
    } catch (IOException e) {
        LOG.error("Unexpected exception in LearnerHandler: ", e);
        closeSocket();
    } catch (InterruptedException e) {
        LOG.error("Unexpected exception in LearnerHandler.", e);
    } catch (SyncThrottleException e) {
        LOG.error("too many concurrent sync.", e);
        syncThrottler = null;
    } catch (Exception e) {
        LOG.error("Unexpected exception in LearnerHandler.", e);
        throw e;
    } finally {
        if (syncThrottler != null) {
            syncThrottler.endSync();
            syncThrottler = null;
        }
        String remoteAddr = getRemoteAddress();
        LOG.warn("******* GOODBYE {} ********", remoteAddr);
        messageTracker.dumpToLog(remoteAddr);
        shutdown();
    }
}

6. 创建registerWithLeader

  1. 回到Follower.followLeader()方法
java
void followLeader() throws InterruptedException {
    ......

    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        // 查找 leader
        QuorumServer leaderServer = findLeader();
        try {
            // 连接 leader
            connectToLeader(leaderServer.addr, leaderServer.hostname);
            connectionTime = System.currentTimeMillis();
            // 向 leader 注册
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
            if (self.isReconfigStateChange()) {
                throw new Exception("learned about role change");
            }
            //check to see if the leader zxid is lower than ours
            //this should never happen but is just a safety check
            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                LOG.error("Proposed leader epoch "
                            + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch "
                            + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                throw new IOException("Error: Epoch of leader is lower");
            }
            long startTime = Time.currentElapsedTime();
            self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
            self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
            syncWithLeader(newEpochZxid);
            self.setZabState(QuorumPeer.ZabState.BROADCAST);
            completedSync = true;
            long syncTime = Time.currentElapsedTime() - startTime;
            ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
            if (self.getObserverMasterPort() > 0) {
                LOG.info("Starting ObserverMaster");

                om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
                om.start();
            } else {
                om = null;
            }
            // create a reusable packet to reduce gc impact
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning()) {
                // 读取 packet 信息
                readPacket(qp);
                // 处理 packet 消息
                processPacket(qp);
            }
        } catch (Exception e) {
            LOG.warn("Exception when following the leader", e);
            closeSocket();

            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        ......
    }
}
  1. 向Leader注册
java
protected long registerWithLeader(int pktType) throws IOException {
    /*
    * Send follower info, including last zxid and sid
    */
    long lastLoggedZxid = self.getLastLoggedZxid();
    QuorumPacket qp = new QuorumPacket();
    qp.setType(pktType);
    qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));

    /*
    * Add sid to payload
    */
    LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion());
    ByteArrayOutputStream bsid = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
    boa.writeRecord(li, "LearnerInfo");
    qp.setData(bsid.toByteArray());
    // 发送 FollowerInfo 给 Leader
    writePacket(qp, true);
    // 读取 Leader 返回的结果:LeaderInfo
    readPacket(qp);
    final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
    // 如果接收到 LeaderInfo
    if (qp.getType() == Leader.LEADERINFO) {
        // we are connected to a 1.0 server so accept the new epoch and read the next packet
        leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
        byte[] epochBytes = new byte[4];
        final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
        // 接收 leader 的 epoch
        if (newEpoch > self.getAcceptedEpoch()) {
            // 把自己原来的 epoch 保存在 wrappedEpochBytes 里
            wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
            // 把 leader 发送过来的 epoch 保存起来
            self.setAcceptedEpoch(newEpoch);
        } else if (newEpoch == self.getAcceptedEpoch()) {
            // since we have already acked an epoch equal to the leaders, we cannot ack
            // again, but we still need to send our lastZxid to the leader so that we can
            // sync with it if it does assume leadership of the epoch.
            // the -1 indicates that this reply should not count as an ack for the new epoch
            wrappedEpochBytes.putInt(-1);
        } else {
            throw new IOException("Leaders epoch, "
                                    + newEpoch
                                    + " is less than accepted epoch, "
                                    + self.getAcceptedEpoch());
        }
        // 发送 ackepoch 给 leader(包含了自己的:epoch 和 zxid)
        QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
        writePacket(ackNewEpoch, true);
        return ZxidUtils.makeZxid(newEpoch, 0);
    } else {
        if (newEpoch > self.getAcceptedEpoch()) {
            self.setAcceptedEpoch(newEpoch);
        }
        if (qp.getType() != Leader.NEWLEADER) {
            LOG.error("First packet should have been NEWLEADER");
            throw new IOException("First packet should have been NEWLEADER");
        }
        return qp.getZxid();
    }
}

7. Leader接收Follwer状态,根据同步方式发送同步消息

  1. 再次回到Leader中,接收状态消息 LearnerHandler.java
java
@Override
public void run() {
    try {
        learnerMaster.addLearnerHandler(this);
        // 心跳处理
        tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline();

        ia = BinaryInputArchive.getArchive(bufferedInput);
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        oa = BinaryOutputArchive.getArchive(bufferedOutput);
        // 从网络中接收消息,并反序列化为packet
        QuorumPacket qp = new QuorumPacket();
        ia.readRecord(qp, "packet");

        messageTracker.trackReceived(qp.getType());
        // 选举结束后,observer和follower都应该给leader发送一个标志信息:FOLLOWERINFO或者OBSERVERINFO
        if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
            LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString());
            return;
        }

        if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
            throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
        }
        byte[] learnerInfoData = qp.getData();
        if (learnerInfoData != null) {
            ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
            if (learnerInfoData.length >= 8) {
                this.sid = bbsid.getLong();
            }
            if (learnerInfoData.length >= 12) {
                this.version = bbsid.getInt(); // protocolVersion
            }
            if (learnerInfoData.length >= 20) {
                long configVersion = bbsid.getLong();
                if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
                    throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                }
            }
        } else {
            this.sid = learnerMaster.getAndDecrementFollowerCounter();
        }

        String followerInfo = learnerMaster.getPeerInfo(this.sid);
        if (followerInfo.isEmpty()) {
            LOG.info(
                "Follower sid: {} not in the current config {}",
                this.sid,
                Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
        } else {
            LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo);
        }

        if (qp.getType() == Leader.OBSERVERINFO) {
            learnerType = LearnerType.OBSERVER;
        }

        learnerMaster.registerLearnerHandlerBean(this, sock);
        // 读取Follower发送过来的lastAcceptedEpoch
        // 选举过程中,所使用的epoch,其实还是上一任leader的epoch
        long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
        
        long peerLastZxid;
        StateSummary ss = null;
        // 读取 follower 发送过来的 zxid
        long zxid = qp.getZxid();
        // 获取 leader 的最新 epoch
        // 新的 leader 会构建一个新的 epoch
        long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
        long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            learnerMaster.waitForEpochAck(this.getSid(), ss);
        } else {
            byte[] ver = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
            // Leader 向 Follower 发送信息(包含:zxid 和 newEpoch)
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
            oa.writeRecord(newEpochPacket, "packet");
            messageTracker.trackSent(Leader.LEADERINFO);
            bufferedOutput.flush();
            // 接收到Follower应答的ackepoch
            QuorumPacket ackEpochPacket = new QuorumPacket();
            ia.readRecord(ackEpochPacket, "packet");
            messageTracker.trackReceived(ackEpochPacket.getType());
            if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
                return;
            }
            ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
            // 保存了对方follower或者observer的状态:epoch和zxid
            ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
            learnerMaster.waitForEpochAck(this.getSid(), ss);
        }
        peerLastZxid = ss.getLastZxid();

        // Take any necessary action if we need to send TRUNC or DIFF
        // startForwarding() will be called in all cases
        // 方法判断Leader和Follower是否需要同步
        boolean needSnap = syncFollower(peerLastZxid, learnerMaster);

        // syncs between followers and the leader are exempt from throttling because it
        // is important to keep the state of quorum servers up-to-date. The exempted syncs
        // are counted as concurrent syncs though
        boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
        /* if we are not truncating or sending a diff just send a snapshot */
        if (needSnap) {
            syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
            syncThrottler.beginSync(exemptFromThrottle);
            ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
            try {
                long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                messageTracker.trackSent(Leader.SNAP);
                bufferedOutput.flush();

                LOG.info(
                    "Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
                        + "send zxid of db as 0x{}, {} concurrent snapshot sync, "
                        + "snapshot sync was {} from throttle",
                    Long.toHexString(peerLastZxid),
                    Long.toHexString(leaderLastZxid),
                    Long.toHexString(zxidToSend),
                    syncThrottler.getSyncInProgress(),
                    exemptFromThrottle ? "exempt" : "not exempt");
                // Dump data to peer
                learnerMaster.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
                bufferedOutput.flush();
            } finally {
                ServerMetrics.getMetrics().SNAP_COUNT.add(1);
            }
        } else {
            syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
            syncThrottler.beginSync(exemptFromThrottle);
            ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
            ServerMetrics.getMetrics().DIFF_COUNT.add(1);
        }

        LOG.debug("Sending NEWLEADER message to {}", sid);
        // the version of this quorumVerifier will be set by leader.lead() in case
        // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
        // we got here, so the version was set
        if (getVersion() < 0x10000) {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
            oa.writeRecord(newLeaderQP, "packet");
        } else {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
            queuedPackets.add(newLeaderQP);
        }
        bufferedOutput.flush();

        // Start thread that blast packets in the queue to learner
        startSendingPackets();

        /*
            * Have to wait for the first ACK, wait until
            * the learnerMaster is ready, and only then we can
            * start processing messages.
            */
        qp = new QuorumPacket();
        ia.readRecord(qp, "packet");

        messageTracker.trackReceived(qp.getType());
        if (qp.getType() != Leader.ACK) {
            LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
            return;
        }

        LOG.debug("Received NEWLEADER-ACK message from {}", sid);

        learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());

        syncLimitCheck.start();
        // sync ends when NEWLEADER-ACK is received
        syncThrottler.endSync();
        if (needSnap) {
            ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
        } else {
            ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
        }
        syncThrottler = null;

        // now that the ack has been processed expect the syncLimit
        sock.setSoTimeout(learnerMaster.syncTimeout());

        /*
        * Wait until learnerMaster starts up
        */
        learnerMaster.waitForStartup();

        // Mutation packets will be queued during the serialize,
        // so we need to mark when the peer can actually start
        // using the data
        //
        LOG.debug("Sending UPTODATE message to {}", sid);
        queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

        while (true) {
            ......
        }
    } catch (IOException e) {
        ......
    } finally {
        ......
    }
}
  1. 判断和Follower的同步方式代码 LearnerHandler.java
java
boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) {
    /*
        * When leader election is completed, the leader will set its
        * lastProcessedZxid to be (epoch < 32). There will be no txn associated
        * with this zxid.
        *
        * The learner will set its lastProcessedZxid to the same value if
        * it get DIFF or SNAP from the learnerMaster. If the same learner come
        * back to sync with learnerMaster using this zxid, we will never find this
        * zxid in our history. In this case, we will ignore TRUNC logic and
        * always send DIFF if we have old enough history
        */
    boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
    // Keep track of the latest zxid which already queued
    long currentZxid = peerLastZxid;
    boolean needSnap = true;
    ZKDatabase db = learnerMaster.getZKDatabase();
    boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
    ReentrantReadWriteLock lock = db.getLogLock();
    ReadLock rl = lock.readLock();
    try {
        rl.lock();
        long maxCommittedLog = db.getmaxCommittedLog();
        long minCommittedLog = db.getminCommittedLog();
        long lastProcessedZxid = db.getDataTreeLastProcessedZxid();

        LOG.info("Synchronizing with Learner sid: {} maxCommittedLog=0x{}"
                    + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
                    + " peerLastZxid=0x{}",
                    getSid(),
                    Long.toHexString(maxCommittedLog),
                    Long.toHexString(minCommittedLog),
                    Long.toHexString(lastProcessedZxid),
                    Long.toHexString(peerLastZxid));

        if (db.getCommittedLog().isEmpty()) {
            /*
                * It is possible that committedLog is empty. In that case
                * setting these value to the latest txn in learnerMaster db
                * will reduce the case that we need to handle
                *
                * Here is how each case handle by the if block below
                * 1. lastProcessZxid == peerZxid -> Handle by (2)
                * 2. lastProcessZxid < peerZxid -> Handle by (3)
                * 3. lastProcessZxid > peerZxid -> Handle by (5)
                */
            minCommittedLog = lastProcessedZxid;
            maxCommittedLog = lastProcessedZxid;
        }

        /*
            * Here are the cases that we want to handle
            *
            * 1. Force sending snapshot (for testing purpose)
            * 2. Peer and learnerMaster is already sync, send empty diff
            * 3. Follower has txn that we haven't seen. This may be old leader
            *    so we need to send TRUNC. However, if peer has newEpochZxid,
            *    we cannot send TRUNC since the follower has no txnlog
            * 4. Follower is within committedLog range or already in-sync.
            *    We may need to send DIFF or TRUNC depending on follower's zxid
            *    We always send empty DIFF if follower is already in-sync
            * 5. Follower missed the committedLog. We will try to use on-disk
            *    txnlog + committedLog to sync with follower. If that fail,
            *    we will send snapshot
            */

        if (forceSnapSync) {
            // Force learnerMaster to use snapshot to sync with follower
            LOG.warn("Forcing snapshot sync - should not see this in production");
        } else if (lastProcessedZxid == peerLastZxid) {
            // Follower is already sync with us, send empty diff
            LOG.info(
                "Sending DIFF zxid=0x{} for peer sid: {}",
                Long.toHexString(peerLastZxid),
                getSid());
            queueOpPacket(Leader.DIFF, peerLastZxid);
            needOpPacket = false;
            needSnap = false;
        } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
            // Newer than committedLog, send trunc and done
            LOG.debug(
                "Sending TRUNC to follower zxidToSend=0x{} for peer sid:{}",
                Long.toHexString(maxCommittedLog),
                getSid());
            queueOpPacket(Leader.TRUNC, maxCommittedLog);
            currentZxid = maxCommittedLog;
            needOpPacket = false;
            needSnap = false;
        } else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
            // Follower is within commitLog range
            LOG.info("Using committedLog for peer sid: {}", getSid());
            Iterator<Proposal> itr = db.getCommittedLog().iterator();
            currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);
            needSnap = false;
        } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
            // Use txnlog and committedLog to sync

            // Calculate sizeLimit that we allow to retrieve txnlog from disk
            long sizeLimit = db.calculateTxnLogSizeLimit();
            // This method can return empty iterator if the requested zxid
            // is older than on-disk txnlog
            Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);
            if (txnLogItr.hasNext()) {
                LOG.info("Use txnlog and committedLog for peer sid: {}", getSid());
                currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);

                if (currentZxid < minCommittedLog) {
                    LOG.info(
                        "Detected gap between end of txnlog: 0x{} and start of committedLog: 0x{}",
                        Long.toHexString(currentZxid),
                        Long.toHexString(minCommittedLog));
                    currentZxid = peerLastZxid;
                    // Clear out currently queued requests and revert
                    // to sending a snapshot.
                    queuedPackets.clear();
                    needOpPacket = true;
                } else {
                    LOG.debug("Queueing committedLog 0x{}", Long.toHexString(currentZxid));
                    Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
                    currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);
                    needSnap = false;
                }
            }
            // closing the resources
            if (txnLogItr instanceof TxnLogProposalIterator) {
                TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
                txnProposalItr.close();
            }
        } else {
            LOG.warn(
                "Unhandled scenario for peer sid: {} maxCommittedLog=0x{}"
                    + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
                    + " peerLastZxid=0x{} txnLogSyncEnabled={}",
                getSid(),
                Long.toHexString(maxCommittedLog),
                Long.toHexString(minCommittedLog),
                Long.toHexString(lastProcessedZxid),
                Long.toHexString(peerLastZxid),
                txnLogSyncEnabled);
        }
        if (needSnap) {
            currentZxid = db.getDataTreeLastProcessedZxid();
        }

        LOG.debug("Start forwarding 0x{} for peer sid: {}", Long.toHexString(currentZxid), getSid());
        leaderLastZxid = learnerMaster.startForwarding(this, currentZxid);
    } finally {
        rl.unlock();
    }

    if (needOpPacket && !needSnap) {
        // This should never happen, but we should fall back to sending
        // snapshot just in case.
        LOG.error("Unhandled scenario for peer sid: {} fall back to use snapshot",  getSid());
        needSnap = true;
    }

    return needSnap;
}

8. Follower应答Leader同步结果

  1. 回到Follower,处理Leader同步过来的数据
java
protected void processPacket(QuorumPacket qp) throws Exception {
    switch (qp.getType()) {
    case Leader.PING:
        ping(qp);
        break;
    case Leader.PROPOSAL:
        ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
        TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
        TxnHeader hdr = logEntry.getHeader();
        Record txn = logEntry.getTxn();
        TxnDigest digest = logEntry.getDigest();
        if (hdr.getZxid() != lastQueued + 1) {
            LOG.warn(
                "Got zxid 0x{} expected 0x{}",
                Long.toHexString(hdr.getZxid()),
                Long.toHexString(lastQueued + 1));
        }
        lastQueued = hdr.getZxid();

        if (hdr.getType() == OpCode.reconfig) {
            SetDataTxn setDataTxn = (SetDataTxn) txn;
            QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
            self.setLastSeenQuorumVerifier(qv, true);
        }

        fzk.logRequest(hdr, txn, digest);
        if (hdr != null) {
            /*
                * Request header is created only by the leader, so this is only set
                * for quorum packets. If there is a clock drift, the latency may be
                * negative. Headers use wall time, not CLOCK_MONOTONIC.
                */
            long now = Time.currentWallTime();
            long latency = now - hdr.getTime();
            if (latency >= 0) {
                ServerMetrics.getMetrics().PROPOSAL_LATENCY.add(latency);
            }
        }
        if (om != null) {
            final long startTime = Time.currentElapsedTime();
            om.proposalReceived(qp);
            ServerMetrics.getMetrics().OM_PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - startTime);
        }
        break;
    case Leader.COMMIT:
        ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
        fzk.commit(qp.getZxid());
        if (om != null) {
            final long startTime = Time.currentElapsedTime();
            om.proposalCommitted(qp.getZxid());
            ServerMetrics.getMetrics().OM_COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - startTime);
        }
        break;

    case Leader.COMMITANDACTIVATE:
        // get the new configuration from the request
        Request request = fzk.pendingTxns.element();
        SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
        QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));

        // get new designated leader from (current) leader's message
        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
        long suggestedLeaderId = buffer.getLong();
        final long zxid = qp.getZxid();
        boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);
        // commit (writes the new config to ZK tree (/zookeeper/config)
        fzk.commit(zxid);

        if (om != null) {
            om.informAndActivate(zxid, suggestedLeaderId);
        }
        if (majorChange) {
            throw new Exception("changes proposed in reconfig");
        }
        break;
    case Leader.UPTODATE:
        LOG.error("Received an UPTODATE message after Follower started");
        break;
    case Leader.REVALIDATE:
        if (om == null || !om.revalidateLearnerSession(qp)) {
            revalidate(qp);
        }
        break;
    case Leader.SYNC:
        fzk.sync();
        break;
    default:
        LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
        break;
    }
}
  1. 完成同步
java
public void commit(long zxid) {
    if (pendingTxns.size() == 0) {
        LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn");
        return;
    }
    long firstElementZxid = pendingTxns.element().zxid;
    if (firstElementZxid != zxid) {
        LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
                    + " but next pending txn 0x" + Long.toHexString(firstElementZxid));
        ServiceUtils.requestSystemExit(ExitCode.UNMATCHED_TXN_COMMIT.getValue());
    }
    Request request = pendingTxns.remove();
    request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
    commitProcessor.commit(request);
}

9. Leader应答Follower

最后回到Leader中,LearnerHandler.java

java
@Override
public void run() {
    try {
        learnerMaster.addLearnerHandler(this);
        tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline();

        ia = BinaryInputArchive.getArchive(bufferedInput);
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        oa = BinaryOutputArchive.getArchive(bufferedOutput);

        QuorumPacket qp = new QuorumPacket();
        ia.readRecord(qp, "packet");

        messageTracker.trackReceived(qp.getType());
        if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
            LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString());

            return;
        }

        if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
            throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
        }
        byte[] learnerInfoData = qp.getData();
        if (learnerInfoData != null) {
            ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
            if (learnerInfoData.length >= 8) {
                this.sid = bbsid.getLong();
            }
            if (learnerInfoData.length >= 12) {
                this.version = bbsid.getInt(); // protocolVersion
            }
            if (learnerInfoData.length >= 20) {
                long configVersion = bbsid.getLong();
                if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
                    throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                }
            }
        } else {
            this.sid = learnerMaster.getAndDecrementFollowerCounter();
        }

        String followerInfo = learnerMaster.getPeerInfo(this.sid);
        if (followerInfo.isEmpty()) {
            LOG.info(
                "Follower sid: {} not in the current config {}",
                this.sid,
                Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
        } else {
            LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo);
        }

        if (qp.getType() == Leader.OBSERVERINFO) {
            learnerType = LearnerType.OBSERVER;
        }

        learnerMaster.registerLearnerHandlerBean(this, sock);

        long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

        long peerLastZxid;
        StateSummary ss = null;
        long zxid = qp.getZxid();
        long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
        long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            learnerMaster.waitForEpochAck(this.getSid(), ss);
        } else {
            byte[] ver = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
            oa.writeRecord(newEpochPacket, "packet");
            messageTracker.trackSent(Leader.LEADERINFO);
            bufferedOutput.flush();
            QuorumPacket ackEpochPacket = new QuorumPacket();
            ia.readRecord(ackEpochPacket, "packet");
            messageTracker.trackReceived(ackEpochPacket.getType());
            if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
                return;
            }
            ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
            ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
            learnerMaster.waitForEpochAck(this.getSid(), ss);
        }
        peerLastZxid = ss.getLastZxid();

        // Take any necessary action if we need to send TRUNC or DIFF
        // startForwarding() will be called in all cases
        boolean needSnap = syncFollower(peerLastZxid, learnerMaster);

        // syncs between followers and the leader are exempt from throttling because it
        // is important to keep the state of quorum servers up-to-date. The exempted syncs
        // are counted as concurrent syncs though
        boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
        /* if we are not truncating or sending a diff just send a snapshot */
        if (needSnap) {
            syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
            syncThrottler.beginSync(exemptFromThrottle);
            ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
            try {
                long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                messageTracker.trackSent(Leader.SNAP);
                bufferedOutput.flush();

                LOG.info(
                    "Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
                        + "send zxid of db as 0x{}, {} concurrent snapshot sync, "
                        + "snapshot sync was {} from throttle",
                    Long.toHexString(peerLastZxid),
                    Long.toHexString(leaderLastZxid),
                    Long.toHexString(zxidToSend),
                    syncThrottler.getSyncInProgress(),
                    exemptFromThrottle ? "exempt" : "not exempt");
                // Dump data to peer
                learnerMaster.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
                bufferedOutput.flush();
            } finally {
                ServerMetrics.getMetrics().SNAP_COUNT.add(1);
            }
        } else {
            syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
            syncThrottler.beginSync(exemptFromThrottle);
            ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
            ServerMetrics.getMetrics().DIFF_COUNT.add(1);
        }

        LOG.debug("Sending NEWLEADER message to {}", sid);
        // the version of this quorumVerifier will be set by leader.lead() in case
        // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
        // we got here, so the version was set
        if (getVersion() < 0x10000) {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
            oa.writeRecord(newLeaderQP, "packet");
        } else {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
            queuedPackets.add(newLeaderQP);
        }
        bufferedOutput.flush();

        // Start thread that blast packets in the queue to learner
        startSendingPackets();

        /*
            * Have to wait for the first ACK, wait until
            * the learnerMaster is ready, and only then we can
            * start processing messages.
            */
        qp = new QuorumPacket();
        ia.readRecord(qp, "packet");

        messageTracker.trackReceived(qp.getType());
        if (qp.getType() != Leader.ACK) {
            LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
            return;
        }

        LOG.debug("Received NEWLEADER-ACK message from {}", sid);

        learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());

        syncLimitCheck.start();
        // sync ends when NEWLEADER-ACK is received
        syncThrottler.endSync();
        if (needSnap) {
            ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
        } else {
            ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
        }
        syncThrottler = null;

        // now that the ack has been processed expect the syncLimit
        sock.setSoTimeout(learnerMaster.syncTimeout());

        /*
        * Wait until learnerMaster starts up
        */
        learnerMaster.waitForStartup();

        // Mutation packets will be queued during the serialize,
        // so we need to mark when the peer can actually start
        // using the data
        // 
        LOG.debug("Sending UPTODATE message to {}", sid);
        queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

        while (true) {
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            messageTracker.trackReceived(qp.getType());

            if (LOG.isTraceEnabled()) {
                long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
                if (qp.getType() == Leader.PING) {
                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
                }
                ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
            }
            tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline();

            packetsReceived.incrementAndGet();

            ByteBuffer bb;
            long sessionId;
            int cxid;
            int type;

            switch (qp.getType()) {
            case Leader.ACK:
                if (this.learnerType == LearnerType.OBSERVER) {
                    LOG.debug("Received ACK from Observer {}", this.sid);
                }
                syncLimitCheck.updateAck(qp.getZxid());
                learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                break;
            case Leader.PING:
                // Process the touches
                ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
                DataInputStream dis = new DataInputStream(bis);
                while (dis.available() > 0) {
                    long sess = dis.readLong();
                    int to = dis.readInt();
                    learnerMaster.touch(sess, to);
                }
                break;
            case Leader.REVALIDATE:
                ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
                learnerMaster.revalidateSession(qp, this);
                break;
            case Leader.REQUEST:
                bb = ByteBuffer.wrap(qp.getData());
                sessionId = bb.getLong();
                cxid = bb.getInt();
                type = bb.getInt();
                bb = bb.slice();
                Request si;
                if (type == OpCode.sync) {
                    si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                } else {
                    si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                }
                si.setOwner(this);
                learnerMaster.submitLearnerRequest(si);
                requestsReceived.incrementAndGet();
                break;
            default:
                LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
                break;
            }
        }
    } catch (IOException e) {
        LOG.error("Unexpected exception in LearnerHandler: ", e);
        closeSocket();
    } catch (InterruptedException e) {
        LOG.error("Unexpected exception in LearnerHandler.", e);
    } catch (SyncThrottleException e) {
        LOG.error("too many concurrent sync.", e);
        syncThrottler = null;
    } catch (Exception e) {
        LOG.error("Unexpected exception in LearnerHandler.", e);
        throw e;
    } finally {
        if (syncThrottler != null) {
            syncThrottler.endSync();
            syncThrottler = null;
        }
        String remoteAddr = getRemoteAddress();
        LOG.warn("******* GOODBYE {} ********", remoteAddr);
        messageTracker.dumpToLog(remoteAddr);
        shutdown();
    }
}