Skip to content

服务端Leader启动

1. Leader启动工作内容

Alt text

2. 启动Zookeeper服务

  1. lead()方法 Leader.java
java
void lead() throws IOException, InterruptedException {
    ......

    try {
        ......

        startZkServer();

        ......
    } finally {
        ......
    }
}
  1. 启动zk服务方法 Leader.java
java
private synchronized void startZkServer() {
    // Update lastCommitted and Db's zxid to a value representing the new epoch
    lastCommitted = zk.getZxid();
    LOG.info("Have quorum of supporters, sids: [{}]; starting up and setting last processed zxid: 0x{}",
                newLeaderProposal.ackSetsToString(),
                Long.toHexString(zk.getZxid()));

    if (self.isReconfigEnabled()) {
        /*
            * ZOOKEEPER-1324. the leader sends the new config it must complete
            *  to others inside a NEWLEADER message (see LearnerHandler where
            *  the NEWLEADER message is constructed), and once it has enough
            *  acks we must execute the following code so that it applies the
            *  config to itself.
            */
        QuorumVerifier newQV = self.getLastSeenQuorumVerifier();

        Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());

        self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
        if (designatedLeader != self.getMyId()) {
            LOG.warn("This leader is not the designated leader, it will be initialized with allowedToCommit = false");
            allowedToCommit = false;
        }
    } else {
        LOG.info("Dynamic reconfig feature is disabled, skip designatedLeader calculation and reconfig processing.");
    }

    leaderStartTime = Time.currentElapsedTime();
    // 启动 Zookeeper
    zk.startup();
    /*
        * Update the election vote here to ensure that all members of the
        * ensemble report the same vote to new servers that start up and
        * send leader election notifications to the ensemble.
        *
        * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
        */
    self.updateElectionVote(getEpoch());

    zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
}
  1. 启动方法调用父类启动 LeaderZooKeeperServer.java
java
@Override
public synchronized void startup() {
    // 调用父类启动方法
    super.startup();
    if (containerManager != null) {
        containerManager.start();
    }
}
  1. 启动具体需要的项目内容
    ZooKeeperServer.java
java
public synchronized void startup() {
    startupWithServerState(State.RUNNING);
}

private void startupWithServerState(State state) {
    if (sessionTracker == null) {
        createSessionTracker();
    }
    startSessionTracker();
    setupRequestProcessors();

    startRequestThrottler();

    registerJMX();

    startJvmPauseMonitor();

    registerMetrics();

    setState(state);

    requestPathMetricsCollector.start();

    localSessionEnabled = sessionTracker.isLocalSessionsEnabled();

    notifyAll();
}

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
    ((SyncRequestProcessor) syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor) firstProcessor).start();
}
  1. PrepRequestProcessor是线程类,查看run()方法
    PrepRequestProcessor.java
java
@Override
public void run() {
    LOG.info(String.format("PrepRequestProcessor (sid:%d) started, reconfigEnabled=%s", zks.getServerId(), zks.reconfigEnabled));
    try {
        while (true) {
            ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
            Request request = submittedRequests.take();
            ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
                .add(Time.currentElapsedTime() - request.prepQueueStartTime);
            if (LOG.isTraceEnabled()) {
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
            }
            if (Request.requestOfDeath == request) {
                break;
            }

            request.prepStartTime = Time.currentElapsedTime();
            pRequest(request);
        }
    } catch (Exception e) {
        handleException(this.getName(), e);
    }
    LOG.info("PrepRequestProcessor exited loop!");
}
  1. 处理请求
java
protected void pRequest(Request request) throws RequestProcessorException {
    // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
    // request.type + " id = 0x" + Long.toHexString(request.sessionId));
    request.setHdr(null);
    request.setTxn(null);

    if (!request.isThrottled()) {
        pRequestHelper(request);
    }

    request.zxid = zks.getZxid();
    long timeFinishedPrepare = Time.currentElapsedTime();
    ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
    nextProcessor.processRequest(request);
    ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
}
  1. 具体处理请求
java
private void pRequestHelper(Request request) throws RequestProcessorException {
    try {
        switch (request.type) {
        case OpCode.createContainer:
        case OpCode.create:
        case OpCode.create2:
            CreateRequest create2Request = new CreateRequest();
            pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
            break;
        case OpCode.createTTL:
            CreateTTLRequest createTtlRequest = new CreateTTLRequest();
            pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
            break;
        case OpCode.deleteContainer:
        case OpCode.delete:
            DeleteRequest deleteRequest = new DeleteRequest();
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
            break;
        case OpCode.setData:
            SetDataRequest setDataRequest = new SetDataRequest();
            pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
            break;
        case OpCode.reconfig:
            ReconfigRequest reconfigRequest = new ReconfigRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
            pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
            break;
        case OpCode.setACL:
            SetACLRequest setAclRequest = new SetACLRequest();
            pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
            break;
        case OpCode.check:
            CheckVersionRequest checkRequest = new CheckVersionRequest();
            pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
            break;
        case OpCode.multi:
            MultiOperationRecord multiRequest = new MultiOperationRecord();
            try {
                ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
            } catch (IOException e) {
                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
                throw e;
            }
            List<Txn> txns = new ArrayList<Txn>();
            //Each op in a multi-op must have the same zxid!
            long zxid = zks.getNextZxid();
            KeeperException ke = null;

            //Store off current pending change records in case we need to rollback
            Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
            request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                    Time.currentWallTime(), request.type));

            for (Op op : multiRequest) {
                Record subrequest = op.toRequestRecord();
                int type;
                Record txn;

                /* If we've already failed one of the ops, don't bother
                    * trying the rest as we know it's going to fail and it
                    * would be confusing in the logfiles.
                    */
                if (ke != null) {
                    type = OpCode.error;
                    txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                } else {
                    /* Prep the request and convert to a Txn */
                    try {
                        pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                        type = op.getType();
                        txn = request.getTxn();
                    } catch (KeeperException e) {
                        ke = e;
                        type = OpCode.error;
                        txn = new ErrorTxn(e.code().intValue());

                        if (e.code().intValue() > Code.APIERROR.intValue()) {
                            LOG.info("Got user-level KeeperException when processing {} aborting"
                                        + " remaining multi ops. Error Path:{} Error:{}",
                                        request.toString(),
                                        e.getPath(),
                                        e.getMessage());
                        }

                        request.setException(e);

                        /* Rollback change records from failed multi-op */
                        rollbackPendingChanges(zxid, pendingChanges);
                    }
                }

                // TODO: I don't want to have to serialize it here and then
                //       immediately deserialize in next processor. But I'm
                //       not sure how else to get the txn stored into our list.
                try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
                    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                    txn.serialize(boa, "request");
                    ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
                    txns.add(new Txn(type, bb.array()));
                }
            }

            request.setTxn(new MultiTxn(txns));
            if (digestEnabled) {
                setTxnDigest(request);
            }

            break;

        //create/close session don't require request record
        case OpCode.createSession:
        case OpCode.closeSession:
            if (!request.isLocalSession()) {
                pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
            }
            break;

        //All the rest don't need to create a Txn - just verify session
        case OpCode.sync:
        case OpCode.exists:
        case OpCode.getData:
        case OpCode.getACL:
        case OpCode.getChildren:
        case OpCode.getAllChildrenNumber:
        case OpCode.getChildren2:
        case OpCode.ping:
        case OpCode.setWatches:
        case OpCode.setWatches2:
        case OpCode.checkWatches:
        case OpCode.removeWatches:
        case OpCode.getEphemerals:
        case OpCode.multiRead:
        case OpCode.addWatch:
        case OpCode.whoAmI:
            zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
            break;
        default:
            LOG.warn("unknown type {}", request.type);
            break;
        }
    } catch (KeeperException e) {
        if (request.getHdr() != null) {
            request.getHdr().setType(OpCode.error);
            request.setTxn(new ErrorTxn(e.code().intValue()));
        }

        if (e.code().intValue() > Code.APIERROR.intValue()) {
            LOG.info(
                "Got user-level KeeperException when processing {} Error Path:{} Error:{}",
                request.toString(),
                e.getPath(),
                e.getMessage());
        }
        request.setException(e);
    } catch (Exception e) {
        // log at error level as we are returning a marshalling
        // error to the user
        LOG.error("Failed to process {}", request, e);

        StringBuilder sb = new StringBuilder();
        ByteBuffer bb = request.request;
        if (bb != null) {
            bb.rewind();
            while (bb.hasRemaining()) {
                sb.append(String.format("%02x", (0xff & bb.get())));
            }
        } else {
            sb.append("request buffer is null");
        }

        LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
        if (request.getHdr() != null) {
            request.getHdr().setType(OpCode.error);
            request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
        }
    }
}