Skip to content

服务端Follower启动

1. Follower启动工作内容

Alt text

2. followLeader()方法

  1. Follower.java
java
void followLeader() throws InterruptedException {
    ......

    long connectionTime = 0;
    boolean completedSync = false;

    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        QuorumServer leaderServer = findLeader();
        try {
            connectToLeader(leaderServer.addr, leaderServer.hostname);
            connectionTime = System.currentTimeMillis();
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
            if (self.isReconfigStateChange()) {
                throw new Exception("learned about role change");
            }
            //check to see if the leader zxid is lower than ours
            //this should never happen but is just a safety check
            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                LOG.error("Proposed leader epoch "
                            + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch "
                            + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                throw new IOException("Error: Epoch of leader is lower");
            }
            long startTime = Time.currentElapsedTime();
            self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
            self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
            syncWithLeader(newEpochZxid);
            self.setZabState(QuorumPeer.ZabState.BROADCAST);
            completedSync = true;
            long syncTime = Time.currentElapsedTime() - startTime;
            ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
            if (self.getObserverMasterPort() > 0) {
                LOG.info("Starting ObserverMaster");

                om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
                om.start();
            } else {
                om = null;
            }
            // create a reusable packet to reduce gc impact
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning()) {
                readPacket(qp);
                processPacket(qp);
            }
        } catch (Exception e) {
            LOG.warn("Exception when following the leader", e);
            closeSocket();

            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        ......
    }
}
  1. 读取Packet
java
void readPacket(QuorumPacket pp) throws IOException {
    synchronized (leaderIs) {
        leaderIs.readRecord(pp, "packet");
        messageTracker.trackReceived(pp.getType());
    }
    if (LOG.isTraceEnabled()) {
        final long traceMask =
            (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK
                : ZooTrace.SERVER_PACKET_TRACE_MASK;

        ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
    }
}
  1. 按照类型处理消息
java
protected void processPacket(QuorumPacket qp) throws Exception {
    switch (qp.getType()) {
    case Leader.PING:
        ping(qp);
        break;
    case Leader.PROPOSAL:
        ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
        TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
        TxnHeader hdr = logEntry.getHeader();
        Record txn = logEntry.getTxn();
        TxnDigest digest = logEntry.getDigest();
        if (hdr.getZxid() != lastQueued + 1) {
            LOG.warn(
                "Got zxid 0x{} expected 0x{}",
                Long.toHexString(hdr.getZxid()),
                Long.toHexString(lastQueued + 1));
        }
        lastQueued = hdr.getZxid();

        if (hdr.getType() == OpCode.reconfig) {
            SetDataTxn setDataTxn = (SetDataTxn) txn;
            QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
            self.setLastSeenQuorumVerifier(qv, true);
        }

        fzk.logRequest(hdr, txn, digest);
        if (hdr != null) {
            /*
                * Request header is created only by the leader, so this is only set
                * for quorum packets. If there is a clock drift, the latency may be
                * negative. Headers use wall time, not CLOCK_MONOTONIC.
                */
            long now = Time.currentWallTime();
            long latency = now - hdr.getTime();
            if (latency >= 0) {
                ServerMetrics.getMetrics().PROPOSAL_LATENCY.add(latency);
            }
        }
        if (om != null) {
            final long startTime = Time.currentElapsedTime();
            om.proposalReceived(qp);
            ServerMetrics.getMetrics().OM_PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - startTime);
        }
        break;
    case Leader.COMMIT:
        ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
        fzk.commit(qp.getZxid());
        if (om != null) {
            final long startTime = Time.currentElapsedTime();
            om.proposalCommitted(qp.getZxid());
            ServerMetrics.getMetrics().OM_COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - startTime);
        }
        break;

    case Leader.COMMITANDACTIVATE:
        // get the new configuration from the request
        Request request = fzk.pendingTxns.element();
        SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
        QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));

        // get new designated leader from (current) leader's message
        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
        long suggestedLeaderId = buffer.getLong();
        final long zxid = qp.getZxid();
        boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);
        // commit (writes the new config to ZK tree (/zookeeper/config)
        fzk.commit(zxid);

        if (om != null) {
            om.informAndActivate(zxid, suggestedLeaderId);
        }
        if (majorChange) {
            throw new Exception("changes proposed in reconfig");
        }
        break;
    case Leader.UPTODATE:
        LOG.error("Received an UPTODATE message after Follower started");
        break;
    case Leader.REVALIDATE:
        if (om == null || !om.revalidateLearnerSession(qp)) {
            revalidate(qp);
        }
        break;
    case Leader.SYNC:
        fzk.sync();
        break;
    default:
        LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
        break;
    }
}