Skip to content

DataNode启动源码解析

Alt text

1. 在pom.xml中增加如下依赖

xml
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.3.6</version>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>3.3.6</version>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs-client</artifactId>
    <version>3.3.6</version>
    <scope>provided</scope>
</dependency>

2. Ctrl + n 全局查找datanode,进入DataNode.java

  1. DataNode官方说明:

DataNode is a class (and program) that stores a set of blocks for a DFS deployment. A single deployment can have one or many DataNodes. Each DataNode communicates regularly with a single NameNode. It also communicates with client code and other DataNodes from time to time. DataNodes store a series of named blocks. The DataNode allows client code to read these blocks, or to write new block data. The DataNode may also, in response to instructions from its NameNode, delete blocks or copy blocks to/from other DataNodes. The DataNode maintains just one critical table: block-> stream of bytes (of BLOCK_SIZE or less) This info is stored on a local disk. The DataNode reports the table's contents to the NameNode upon startup and every so often afterwards. DataNodes spend their lives in an endless loop of asking the NameNode for something to do. A NameNode cannot connect to a DataNode directly; a NameNode simply returns values from functions invoked by a DataNode. DataNodes maintain an open server socket so that client code or other DataNodes can read/write data. The host/port for this server is reported to the NameNode, which then sends that information to clients or other DataNodes that might be interested.

  1. Ctrl + f,查找main方法 DataNode.java
java
public static void main(String args[]) {
  if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
    System.exit(0);
  }

  secureMain(args, null);
}

public static void secureMain(String args[], SecureResources resources) {
  int errorCode = 0;
  try {
    StringUtils.startupShutdownMessage(DataNode.class, args, LOG);

    DataNode datanode = createDataNode(args, null, resources);

    … …
  } catch (Throwable e) {
    LOG.error("Exception in secureMain", e);
    terminate(1, e);
  } finally {
    LOG.warn("Exiting Datanode");
    terminate(errorCode);
  }
}

public static DataNode createDataNode(String args[], Configuration conf,
    SecureResources resources) throws IOException {
  // 初始化DN
  DataNode dn = instantiateDataNode(args, conf, resources);

  if (dn != null) {
    // 启动DN进程
    dn.runDatanodeDaemon();
  }
  return dn;
}

public static DataNode instantiateDataNode(String args [], Configuration conf,
    SecureResources resources) throws IOException {
  ... ...
  
  return makeInstance(dataLocations, conf, resources);
}

static DataNode makeInstance(Collection<StorageLocation> dataDirs,
    Configuration conf, SecureResources resources) throws IOException {
  ... ...
  return new DataNode(conf, locations, storageLocationChecker, resources);
}

DataNode(final Configuration conf,
         final List<StorageLocation> dataDirs,
         final StorageLocationChecker storageLocationChecker,
         final SecureResources resources) throws IOException {
  super(conf);
  ... ...

  try {
    hostName = getHostName(conf);
    LOG.info("Configured hostname is {}", hostName);
	// 启动DN
    startDataNode(dataDirs, resources);
  } catch (IOException ie) {
    shutdown();
    throw ie;
  }
  ... ...
}

void startDataNode(List<StorageLocation> dataDirectories,
                   SecureResources resources
                   ) throws IOException {
  ... ...
  // 创建数据存储对象
  storage = new DataStorage();
  
  // global DN settings
  registerMXBean();
  // 初始化DataXceiver
  initDataXceiver();
  
  // 启动HttpServer
  startInfoServer();

  pauseMonitor = new JvmPauseMonitor();
  pauseMonitor.init(getConf());
  pauseMonitor.start();

  // BlockPoolTokenSecretManager is required to create ipc server.
  this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();

  // Login is done by now. Set the DN user name.
  dnUserName = UserGroupInformation.getCurrentUser().getUserName();
  LOG.info("dnUserName = {}", dnUserName);
  LOG.info("supergroup = {}", supergroup);
  
  // 初始化RPC服务
  initIpcServer();

  metrics = DataNodeMetrics.create(getConf(), getDisplayName());
  peerMetrics = dnConf.peerStatsEnabled ?
      DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;
  metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

  ecWorker = new ErasureCodingWorker(getConf(), this);
  blockRecoveryWorker = new BlockRecoveryWorker(this);
  
  // 创建BlockPoolManager
  blockPoolManager = new BlockPoolManager(this);
  // 心跳管理
  blockPoolManager.refreshNamenodes(getConf());

  // Create the ReadaheadPool from the DataNode context so we can
  // exit without having to explicitly shutdown its thread pool.
  readaheadPool = ReadaheadPool.getInstance();
  saslClient = new SaslDataTransferClient(dnConf.getConf(),
      dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
  saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
  startMetricsLogger();

  if (dnConf.diskStatsEnabled) {
    diskMetrics = new DataNodeDiskMetrics(this,
        dnConf.outliersReportIntervalMs);
  }
}

3. 初始化DataXceiverServer

初始化DataXceiverServer

java
private void initDataXceiver() throws IOException {
// dataXceiverServer是一个服务,DN用来接收客户端和其他DN发送过来的数据服务
  this.dataXceiverServer = new Daemon(threadGroup, xserver);
  this.threadGroup.setDaemon(true); // auto destroy when empty

  ... ...
}

4. 初始化HTTP服务

点击startInfoServer()
DataNode.java

java
private void startInfoServer()
  throws IOException {
  // SecureDataNodeStarter will bind the privileged port to the channel if
  // the DN is started by JSVC, pass it along.
  ServerSocketChannel httpServerChannel = secureResources != null ?
      secureResources.getHttpServerChannel() : null;

  httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);
  httpServer.start();
  if (httpServer.getHttpAddress() != null) {
    infoPort = httpServer.getHttpAddress().getPort();
  }
  if (httpServer.getHttpsAddress() != null) {
    infoSecurePort = httpServer.getHttpsAddress().getPort();
  }
}

DatanodeHttpServer.java

java
public DatanodeHttpServer(final Configuration conf,
    final DataNode datanode,
    final ServerSocketChannel externalHttpChannel)
  throws IOException {
  
  ... ...
  HttpServer2.Builder builder = new HttpServer2.Builder()
      .setName("datanode")
      .setConf(confForInfoServer)
      .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
      .hostName(getHostnameForSpnegoPrincipal(confForInfoServer))
      .addEndpoint(URI.create("http://localhost:" + proxyPort))
      .setFindPort(true);
  ... ...
}

5. 初始化DN的RPC服务端

点击initIpcServer
DataNode.java

java
private void initIpcServer() throws IOException {
  InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
      getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
  
  ... ...
  ipcServer = new RPC.Builder(getConf())
      .setProtocol(ClientDatanodeProtocolPB.class)
      .setInstance(service)
      .setBindAddress(ipcAddr.getHostName())
      .setPort(ipcAddr.getPort())
      .setNumHandlers(
          getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
              DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
      .setSecretManager(blockPoolTokenSecretManager).build();
  ... ...
}

6. DN向NN注册

点击refreshNamenodes BlockPoolManager.java

java
void refreshNamenodes(Configuration conf)
    throws IOException {
  ... ...

  synchronized (refreshNamenodesLock) {
    doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
  }
}

private void doRefreshNamenodes(
    Map<String, Map<String, InetSocketAddress>> addrMap,
    Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
    throws IOException {
  … ….
  
  synchronized (this) {
    … …

    // Step 3. Start new nameservices
    if (!toAdd.isEmpty()) {

      for (String nsToAdd : toAdd) {
        … …
        BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
        bpByNameserviceId.put(nsToAdd, bpos);
        offerServices.add(bpos);
      }
    }
    startAll();
  }

  … …
}

protected BPOfferService createBPOS(
    final String nameserviceId,
    List<InetSocketAddress> nnAddrs,
    List<InetSocketAddress> lifelineNnAddrs) {
  // 根据NameNode个数创建对应的服务
  return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
}

点击startAll()

java
synchronized void startAll() throws IOException {
  try {
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
          @Override
          public Object run() throws Exception {
            for (BPOfferService bpos : offerServices) {
			  // 启动服务
              bpos.start();
            }
            return null;
          }
        });
  } catch (InterruptedException ex) {
    ... ...
  }
}

点击start ()
BPOfferService.java

java
void start() {
  for (BPServiceActor actor : bpServices) {
    actor.start();
  }
}

点击start ()
BPServiceActor.java

java
void start() {
  ... ...
  bpThread = new Thread(this);
  bpThread.setDaemon(true); // needed for JUnit testing
// 表示开启一个线程,所有查找该线程的run方法
  bpThread.start();

  if (lifelineSender != null) {
    lifelineSender.start();
  }
}

Ctrl + f 搜索run方法

java
public void run() {
  LOG.info(this + " starting to offer service");

  try {
    while (true) {
      // init stuff
      try {
        // setup storage
		// 向NN 注册
        connectToNNAndHandshake();
        break;
      } catch (IOException ioe) {
        // Initial handshake, storage recovery or registration failed
        runningState = RunningState.INIT_FAILED;
        if (shouldRetryInit()) {
          // Retry until all namenode's of BPOS failed initialization
          LOG.error("Initialization failed for " + this + " "
              + ioe.getLocalizedMessage());
		  // 注册失败,5s后重试
          sleepAndLogInterrupts(5000, "initializing");
        } else {
          runningState = RunningState.FAILED;
          LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);
          return;
        }
      }
    }
    … …
    while (shouldRun()) {
      try {
        // 发送心跳
        offerService();
      } catch (Exception ex) {
        ... ...
      }
    }
}

private void connectToNNAndHandshake() throws IOException {
  // get NN proxy 获取NN的RPC客户端对象
  bpNamenode = dn.connectToNN(nnAddr);

  // First phase of the handshake with NN - get the namespace
  // info.
  NamespaceInfo nsInfo = retrieveNamespaceInfo();

  // Verify that this matches the other NN in this HA pair.
  // This also initializes our block pool in the DN if we are
  // the first NN connection for this BP.
  bpos.verifyAndSetNamespaceInfo(this, nsInfo);

  /* set thread name again to include NamespaceInfo when it's available. */
  this.bpThread.setName(formatThreadName("heartbeating", nnAddr));

  // 注册
  register(nsInfo);
}

DatanodeProtocolClientSideTranslatorPB connectToNN(
    InetSocketAddress nnAddr) throws IOException {
  return new DatanodeProtocolClientSideTranslatorPB(nnAddr, getConf());
}
DatanodeProtocolClientSideTranslatorPB.java
public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
    Configuration conf) throws IOException {
  RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
      ProtobufRpcEngine.class);
  UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
  rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
}

private static DatanodeProtocolPB createNamenode(
    InetSocketAddress nameNodeAddr, Configuration conf,
    UserGroupInformation ugi) throws IOException {
  return RPC.getProxy(DatanodeProtocolPB.class,
      RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
      conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class));
}

点击register
BPServiceActor.java

java
void register(NamespaceInfo nsInfo) throws IOException {
  // 创建注册信息
  DatanodeRegistration newBpRegistration = bpos.createRegistration();

  LOG.info(this + " beginning handshake with NN");

  while (shouldRun()) {
    try {
      // Use returned registration from namenode with updated fields
	  // 把注册信息发送给NN(DN调用接口方法,执行在NN)
      newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);
      newBpRegistration.setNamespaceInfo(nsInfo);
      bpRegistration = newBpRegistration;
      break;
    } catch(EOFException e) {  // namenode might have just restarted
      LOG.info("Problem connecting to server: " + nnAddr + " :"
          + e.getLocalizedMessage());
      sleepAndLogInterrupts(1000, "connecting to server");
    } catch(SocketTimeoutException e) {  // namenode is busy
      LOG.info("Problem connecting to server: " + nnAddr);
      sleepAndLogInterrupts(1000, "connecting to server");
    }
  }
  … …
}

Ctrl + n 搜索NameNodeRpcServer
NameNodeRpcServer.java
Ctrl + f 在NameNodeRpcServer.java中搜索registerDatanode

java
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
    throws IOException {
  checkNNStartup();
  verifySoftwareVersion(nodeReg);
  // 注册DN
  namesystem.registerDatanode(nodeReg);

  return nodeReg;
}

FSNamesystem.java

java
void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
  writeLock();
  try {
    blockManager.registerDatanode(nodeReg);
  } finally {
    writeUnlock("registerDatanode");
  }
}

BlockManager.java

java
public void registerDatanode(DatanodeRegistration nodeReg)
    throws IOException {
  assert namesystem.hasWriteLock();
  datanodeManager.registerDatanode(nodeReg);
  bmSafeMode.checkSafeMode();
}

public void registerDatanode(DatanodeRegistration nodeReg)
    throws DisallowedDatanodeException, UnresolvedTopologyException {
	... ...
	// register new datanode 注册DN
    addDatanode(nodeDescr);
    blockManager.getBlockReportLeaseManager().register(nodeDescr);
    // also treat the registration message as a heartbeat
    // no need to update its timestamp
    // because its is done when the descriptor is created
	// 将DN添加到心跳管理
    heartbeatManager.addDatanode(nodeDescr);
    heartbeatManager.updateDnStat(nodeDescr);
    incrementVersionCount(nodeReg.getSoftwareVersion());
    startAdminOperationIfNecessary(nodeDescr);
    success = true;
	... ...
}

void addDatanode(final DatanodeDescriptor node) {
  // To keep host2DatanodeMap consistent with datanodeMap,
  // remove  from host2DatanodeMap the datanodeDescriptor removed
  // from datanodeMap before adding node to host2DatanodeMap.
  synchronized(this) {
    host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
  }

  networktopology.add(node); // may throw InvalidTopologyException
  host2DatanodeMap.add(node);
  checkIfClusterIsNowMultiRack(node);
  resolveUpgradeDomain(node);

  … …
}

7. 向NN发送心跳

点击BPServiceActor.java中的run方法中的offerService方法
BPServiceActor.java

java
private void offerService() throws Exception {

	while (shouldRun()) {
        ... ...
        HeartbeatResponse resp = null;
        if (sendHeartbeat) {

          boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
                  scheduler.isBlockReportDue(startTime);
          if (!dn.areHeartbeatsDisabledForTests()) {
		    // 发送心跳信息
            resp = sendHeartBeat(requestBlockReportLease);
            assert resp != null;
            if (resp.getFullBlockReportLeaseId() != 0) {
              if (fullBlockReportLeaseId != 0) {
				... ...
              }
              fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
            }
            ... ...
          }
        }
		... ...
	}
}

HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
    throws IOException {
	... ...
	// 通过NN的RPC客户端发送给NN
	HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
        reports,
        dn.getFSDataset().getCacheCapacity(),
        dn.getFSDataset().getCacheUsed(),
        dn.getXmitsInProgress(),
        dn.getXceiverCount(),
        numFailedVolumes,
        volumeFailureSummary,
        requestBlockReportLease,
        slowPeers,
        slowDisks);
	... ...
}

Ctrl + n 搜索NameNodeRpcServer
NameNodeRpcServer.java
Ctrl + f 在NameNodeRpcServer.java中搜索sendHeartbeat

java
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
    int xmitsInProgress, int xceiverCount,
    int failedVolumes, VolumeFailureSummary volumeFailureSummary,
    boolean requestFullBlockReportLease,
    @Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {

  checkNNStartup();
  verifyRequest(nodeReg);

  // 处理DN发送的心跳
  return namesystem.handleHeartbeat(nodeReg, report,
      dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
      failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
      slowPeers, slowDisks);
}

HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] reports, long cacheCapacity, long cacheUsed,
    int xceiverCount, int xmitsInProgress, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary,
    boolean requestFullBlockReportLease,
    @Nonnull SlowPeerReports slowPeers,
    @Nonnull SlowDiskReports slowDisks) throws IOException {
  readLock();
  try {
    //get datanode commands
    final int maxTransfer = blockManager.getMaxReplicationStreams()
        - xmitsInProgress;
	// 处理DN发送过来的心跳
    DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
        nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
        xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
        slowPeers, slowDisks);

    long blockReportLeaseId = 0;
    if (requestFullBlockReportLease) {
      blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
    }
    //create ha status
    final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
        haContext.getState().getServiceState(),
        getFSImage().getCorrectLastAppliedOrWrittenTxId());

	// 响应DN的心跳
    return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
        blockReportLeaseId);
  } finally {
    readUnlock("handleHeartbeat");
  }
}

点击handleHeartbeat
DatanodeManager.java

java
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] reports, final String blockPoolId,
    long cacheCapacity, long cacheUsed, int xceiverCount, 
    int maxTransfers, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary,
    @Nonnull SlowPeerReports slowPeers,
    @Nonnull SlowDiskReports slowDisks) throws IOException {
  ... ...
  heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
      cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
  ... ...  
}

HeartbeatManager.java

java
synchronized void updateHeartbeat(final DatanodeDescriptor node,
    StorageReport[] reports, long cacheCapacity, long cacheUsed,
    int xceiverCount, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary) {
  stats.subtract(node);
  blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed,
      xceiverCount, failedVolumes, volumeFailureSummary);
  stats.add(node);
}

BlockManager.java

java
void updateHeartbeat(DatanodeDescriptor node, StorageReport[] reports,
    long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary) {

  for (StorageReport report: reports) {
    providedStorageMap.updateStorage(node, report.getStorage());
  }
  node.updateHeartbeat(reports, cacheCapacity, cacheUsed, xceiverCount,
      failedVolumes, volumeFailureSummary);
}

DatanodeDescriptor.java

java
void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
    long cacheUsed, int xceiverCount, int volFailures,
    VolumeFailureSummary volumeFailureSummary) {
  updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
      volFailures, volumeFailureSummary);
  heartbeatedSinceRegistration = true;
}

void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
    long cacheUsed, int xceiverCount, int volFailures,
    VolumeFailureSummary volumeFailureSummary) {
  // 更新存储
  updateStorageStats(reports, cacheCapacity, cacheUsed, xceiverCount,
      volFailures, volumeFailureSummary);
  // 更新心跳时间
  setLastUpdate(Time.now());
  setLastUpdateMonotonic(Time.monotonicNow());
  rollBlocksScheduled(getLastUpdateMonotonic());
}

private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
    long cacheUsed, int xceiverCount, int volFailures,
    VolumeFailureSummary volumeFailureSummary) {
  long totalCapacity = 0;
  long totalRemaining = 0;
  long totalBlockPoolUsed = 0;
  long totalDfsUsed = 0;
  long totalNonDfsUsed = 0;
  … …

  setCacheCapacity(cacheCapacity);
  setCacheUsed(cacheUsed);
  setXceiverCount(xceiverCount);
  this.volumeFailures = volFailures;
  this.volumeFailureSummary = volumeFailureSummary;
  for (StorageReport report : reports) {

    DatanodeStorageInfo storage =
        storageMap.get(report.getStorage().getStorageID());
    if (checkFailedStorages) {
      failedStorageInfos.remove(storage);
    }

    storage.receivedHeartbeat(report);
    // skip accounting for capacity of PROVIDED storages!
    if (StorageType.PROVIDED.equals(storage.getStorageType())) {
      continue;
    }

    totalCapacity += report.getCapacity();
    totalRemaining += report.getRemaining();
    totalBlockPoolUsed += report.getBlockPoolUsed();
    totalDfsUsed += report.getDfsUsed();
    totalNonDfsUsed += report.getNonDfsUsed();
  }

  // Update total metrics for the node.
  // 更新存储相关信息
  setCapacity(totalCapacity);
  setRemaining(totalRemaining);
  setBlockPoolUsed(totalBlockPoolUsed);
  setDfsUsed(totalDfsUsed);
  setNonDfsUsed(totalNonDfsUsed);
  if (checkFailedStorages) {
    updateFailedStorage(failedStorageInfos);
  }
  long storageMapSize;
  synchronized (storageMap) {
    storageMapSize = storageMap.size();
  }
  if (storageMapSize != reports.length) {
    pruneStorageMap(reports);
  }
}