Skip to content

源码之ZK服务端初始化

1. ZK服务端初始化总流程

Alt text

2. ZK服务端启动脚本分析

  1. ZK服务的启动脚本是zkServer.sh, 启动命令是:zkServer.sh start
sh
#!/usr/bin/env bash
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
....
nohup "$JAVA" 
+ 一堆提交参数
+ $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain)
+ "$ZOOCFG" (zkEnv.sh 文件中 ZOOCFG="zoo.cfg")
....

可以看出程序的入口是QuorumPeerMain.java类。
2. ZK服务端启动入口 QuorumPeerMain.java源码:

java
public static void main(String[] args) {
    // 创建了一个 zk 节点
    QuorumPeerMain main = new QuorumPeerMain();
    try {
        // 初始化节点并运行,args 相当于提交参数中的 zoo.cfg
        main.initializeAndRun(args);
    } catch (IllegalArgumentException e) {
        ......
    }
    LOG.info("Exiting normally");
    ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}
  1. initializeAndRun()方法源码
java
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    // 管理 zk 的配置信息
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        // 1 解析参数,zoo.cfg 和 myid
        config.parse(args[0]);
    }
    // 2 启动定时任务,对过期的快照,执行删除(默认该功能关闭)
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
        config.getDataDir(),
        config.getDataLogDir(),
        config.getSnapRetainCount(),
        config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
        // 3 启动集群
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}

3. 解析参数zoo.cfg和myid文件

  1. QuorumPeerConfig.java源码:
java
public void parse(String path) throws ConfigException {
    LOG.info("Reading configuration from: " + path);
    try {
        // 校验文件路径及是否存在
        File configFile = (new VerifyingFileFactory.Builder(LOG)
            .warnForRelativePath()
            .failForNonExistingPath()
            .build()).create(path);

        Properties cfg = new Properties();
        try (FileInputStream in = new FileInputStream(configFile)) {
            // 加载配置文件
            cfg.load(in);
            configFileStr = path;
        }

        /* Read entire config file as initial configuration */
        initialConfig = new String(Files.readAllBytes(configFile.toPath()));
        // 解析配置文件
        parseProperties(cfg);
    } catch (IOException e) {
        throw new ConfigException("Error processing " + path, e);
    } catch (IllegalArgumentException e) {
        throw new ConfigException("Error processing " + path, e);
    }
    ......
}
  1. parseProperties()方法具体解析参数
java
public void parseProperties(Properties zkProp) throws IOException, ConfigException {
    int clientPort = 0;
    int secureClientPort = 0;
    int observerMasterPort = 0;
    String clientPortAddress = null;
    String secureClientPortAddress = null;
    VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
    for (Entry<Object, Object> entry : zkProp.entrySet()) {
        String key = entry.getKey().toString().trim();
        String value = entry.getValue().toString().trim();
        if (key.equals("dataDir")) {
            dataDir = vff.create(value);
        } else if (key.equals("dataLogDir")) {
            dataLogDir = vff.create(value);
        } else if (key.equals("clientPort")) {
            clientPort = Integer.parseInt(value);
        } else if (key.equals("localSessionsEnabled")) {
            localSessionsEnabled = parseBoolean(key, value);
        } else if (key.equals("localSessionsUpgradingEnabled")) {
            localSessionsUpgradingEnabled = parseBoolean(key, value);
        } else if (key.equals("clientPortAddress")) {
            clientPortAddress = value.trim();
        } else if (key.equals("secureClientPort")) {
            secureClientPort = Integer.parseInt(value);
        } else if (key.equals("secureClientPortAddress")) {
            secureClientPortAddress = value.trim();
        } else if (key.equals("observerMasterPort")) {
            observerMasterPort = Integer.parseInt(value);
        } else if (key.equals("clientPortListenBacklog")) {
            clientPortListenBacklog = Integer.parseInt(value);
        } else if (key.equals("tickTime")) {
            tickTime = Integer.parseInt(value);
        } else if (key.equals("maxClientCnxns")) {
            maxClientCnxns = Integer.parseInt(value);
        } else if (key.equals("minSessionTimeout")) {
            minSessionTimeout = Integer.parseInt(value);
        } else if (key.equals("maxSessionTimeout")) {
            maxSessionTimeout = Integer.parseInt(value);
        } else if (key.equals("initLimit")) {
            initLimit = Integer.parseInt(value);
        } else if (key.equals("syncLimit")) {
            syncLimit = Integer.parseInt(value);
        } else if (key.equals("connectToLearnerMasterLimit")) {
            connectToLearnerMasterLimit = Integer.parseInt(value);
        } else if (key.equals("electionAlg")) {
            electionAlg = Integer.parseInt(value);
            if (electionAlg != 3) {
                throw new ConfigException("Invalid electionAlg value. Only 3 is supported.");
            }
        }
    ......
    }
    if (dynamicConfigFileStr == null) {
        setupQuorumPeerConfig(zkProp, true);
        if (isDistributed() && isReconfigEnabled()) {
            // we don't backup static config for standalone mode.
            // we also don't backup if reconfig feature is disabled.
            backupOldConfig();
        }
    }
}
  1. setupQuorumPeerConfig()方法涉及myid
java
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
    quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode, oraclePath);
    setupMyId();
    setupClientPort();
    setupPeerType();
    checkValidity();
}
  1. myid文件读取并设置,可以看出myid文件需要在dataDir目录下。
java
private void setupMyId() throws IOException {
    File myIdFile = new File(dataDir, "myid");
    // standalone server doesn't need myid file.
    if (!myIdFile.isFile()) {
        return;
    }
    BufferedReader br = new BufferedReader(new FileReader(myIdFile));
    String myIdString;
    try {
        // 就读了一行获取数据
        myIdString = br.readLine();
    } finally {
        br.close();
    }
    try {
        //将解析myid文件中的id赋值给serverId
        serverId = Long.parseLong(myIdString);
        MDC.put("myid", myIdString);
    } catch (NumberFormatException e) {
        throw new IllegalArgumentException("serverid " + myIdString + " is not a number");
    }
}

4. 过期快照删除

  1. 启动定时任务,对过期的快照,执行删除。默认该功能时关闭的。
java
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    // 管理 zk 的配置信息
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }
    // 2 启动定时任务,对过期的快照,执行删除(默认是关闭)
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
        config.getDataDir(),
        config.getDataLogDir(),
        config.getSnapRetainCount(),
        config.getPurgeInterval());
    purgeMgr.start();
    if (args.length == 1 && config.isDistributed()) {
        // 3 启动集群
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}
  1. 定时任务的创建
java
protected int snapRetainCount = 3;
protected int purgeInterval = 0;

public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
        return;
    }
    // 默认情况 purgeInterval=0,该任务关闭,直接返回
    // Don't schedule the purge task with zero or negative purge interval.
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled.");
        return;
    }
    // 创建一个定时器
    timer = new Timer("PurgeTask", true);
    // 创建一个清理快照任务
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
    // 如果 purgeInterval 设置的值是 1,表示 1 小时检查一次,判断是否有过期快照,有则删除
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

    purgeTaskStatus = PurgeTaskStatus.STARTED;
}
  1. PurgeTask类为任务处理
java
static class PurgeTask extends TimerTask {

    private File logsDir;
    private File snapsDir;
    private int snapRetainCount;

    public PurgeTask(File dataDir, File snapDir, int count) {
        logsDir = dataDir;
        snapsDir = snapDir;
        snapRetainCount = count;
    }

    @Override
    public void run() {
        LOG.info("Purge task started.");
        try {
            // 清理过期的数据
            PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
        } catch (Exception e) {
            LOG.error("Error occurred while purging.", e);
        }
        LOG.info("Purge task completed.");
    }
}
  1. purge()方法
java
public static void purge(File dataDir, File snapDir, int num) throws IOException {
    if (num < 3) {
        throw new IllegalArgumentException(COUNT_ERR_MSG);
    }
    FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
    List<File> snaps = txnLog.findNRecentSnapshots(num);
    int numSnaps = snaps.size();
    if (numSnaps > 0) {
        purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
    }
}

5. 初始化通信组件

回到initializeAndRun()方法看runFromConfig方法

java
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }

    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
        config.getDataDir(),
        config.getDataLogDir(),
        config.getSnapRetainCount(),
        config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
        // 3 启动集群(集群模式)
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
        // there is only server in the quorum -- run as standalone
        // 3 或者本地模式启动
        ZooKeeperServerMain.main(args);
    }
}
  1. 通信协议默认NIO(可以支持Netty)
java
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
    ......

    LOG.info("Starting quorum peer, myid=" + config.getServerId());
    .......
    try {
        ServerMetrics.metricsProviderInitialized(metricsProvider);
        ProviderRegistry.initialize();
        ServerCnxnFactory cnxnFactory = null;
        ServerCnxnFactory secureCnxnFactory = null;
        // 通信组件初始化,默认是 NIO 通信
        if (config.getClientPortAddress() != null) {
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
        }

        if (config.getSecureClientPortAddress() != null) {
            secureCnxnFactory = ServerCnxnFactory.createFactory();
            secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
        }
        // 把解析的参数赋值给该zookeeper节点
        quorumPeer = getQuorumPeer();
        quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
        quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
        quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
        //quorumPeer.setQuorumPeers(config.getAllMembers());
        quorumPeer.setElectionType(config.getElectionAlg());
        quorumPeer.setMyid(config.getServerId());
        quorumPeer.setTickTime(config.getTickTime());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setInitLimit(config.getInitLimit());
        quorumPeer.setSyncLimit(config.getSyncLimit());
        quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
        quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
        quorumPeer.setConfigFileName(config.getConfigFilename());
        quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
        // 管理 zk 数据的存储
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
        if (config.getLastSeenQuorumVerifier() != null) {
            quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
        }
        quorumPeer.initConfigInZKDatabase();
        // 管理 zk 的通信
        quorumPeer.setCnxnFactory(cnxnFactory);
        quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
        quorumPeer.setSslQuorum(config.isSslQuorum());
        quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
        quorumPeer.setLearnerType(config.getPeerType());
        quorumPeer.setSyncEnabled(config.getSyncEnabled());
        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
        if (config.sslQuorumReloadCertFiles) {
            quorumPeer.getX509Util().enableCertFileReloading();
        }
        quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
        quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
        quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());

        // sets quorum sasl authentication configurations
        quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
        if (quorumPeer.isQuorumSaslAuthEnabled()) {
            quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
            quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
            quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
            quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
            quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
        }
        quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
        quorumPeer.initialize();

        if (config.jvmPauseMonitorToRun) {
            quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
        }
        // 启动zk服务
        quorumPeer.start();
        ZKAuditProvider.addZKStartStopAuditLog();
        quorumPeer.join();
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Quorum Peer interrupted", e);
    } finally {
        try {
            metricsProvider.stop();
        } catch (Throwable error) {
            LOG.warn("Error while stopping metrics", error);
        }
    }
}

为何Zookeeper的网络底层是NIO?

在doc文件夹下面zookeeperAdmin.html文件中,有这样描述的: Alt text

  1. 初始化NIO服务端Socket(并未启动) NIOServerCnxnFactory.java源码
java
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
    if (secure) {
        throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
    }
    configureSaslLogin();

    maxClientCnxns = maxcc;
    initMaxCnxns();
    sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
    // We also use the sessionlessCnxnTimeout as expiring interval for
    // cnxnExpiryQueue. These don't need to be the same, but the expiring
    // interval passed into the ExpiryQueue() constructor below should be
    // less than or equal to the timeout.
    cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
    expirerThread = new ConnectionExpirerThread();

    int numCores = Runtime.getRuntime().availableProcessors();
    // 32 cores sweet spot seems to be 4 selector threads
    numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores / 2), 1));
    if (numSelectorThreads < 1) {
        throw new IOException("numSelectorThreads must be at least 1");
    }

    numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
    workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

    String logMsg = "Configuring NIO connection handler with "
        + (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "
        + numSelectorThreads + " selector thread(s), "
        + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "
        + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));
    LOG.info(logMsg);
    for (int i = 0; i < numSelectorThreads; ++i) {
        selectorThreads.add(new SelectorThread(i));
    }

    listenBacklog = backlog;
    // 初始化NIO服务端socket,绑定2181端口,可以接收客户端请求
    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port {}", addr);
    // 绑定2181端口
    if (listenBacklog == -1) {
        ss.socket().bind(addr);
    } else {
        ss.socket().bind(addr, listenBacklog);
    }
    ss.configureBlocking(false);
    acceptThread = new AcceptThread(ss, addr, selectorThreads);
}

6. 启动zk服务

QuorumPeer.java的start()方法

java
@Override
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    // 加载数据
    loadDataBase();
    // 启动socket通信
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
    }
    // 投票选举
    startLeaderElection();
    startJvmPauseMonitor();
    super.start();
}