Skip to content

源码之ApplicationMaster

根据上文yar集群会组装成命令并执行${JAVA_HOME}/bin/java -server org.apache.spark.deploy.yarn.ApplicationMaster 命令参数...

1. 进程执行ApplicationMaster

查看ApplicationMaster源码, main入口:

scala
def main(args: Array[String]): Unit = {
    SignalUtils.registerLogger(log)
    // 解析命令行参数
    val amArgs = new ApplicationMasterArguments(args)
    val sparkConf = new SparkConf()
    if (amArgs.propertiesFile != null) {
      Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
        sparkConf.set(k, v)
      }
    }
    ...... 
    val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
    // 根据参数配置信息创建AM
    master = new ApplicationMaster(amArgs, sparkConf, yarnConf)

    ......一些用户身份信息验证的逻辑
    ugi.doAs(new PrivilegedExceptionAction[Unit]() {
       // 执行run方法 
      override def run(): Unit = System.exit(master.run())
    })
  }

查看ApplicationMaster.scala,可以看到它有RM通信的客户端,就是它负责和RM连接信息的: Alt text 回到ApplicationMaster的main方法,最后执行run方法:
Alt text

2. 启动Driver线程

在runDriver()方法中

scala
private def runDriver(): Unit = {
    addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
    // 启动用户的应用程序,比如wordCount程序
    userClassThread = startUserApplication()
    ......

    try {
        // 线程工具类调用阻塞的方法,等待执行结果
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        val rpcEnv = sc.env.rpcEnv

        val userConf = sc.getConf
        val host = userConf.get(DRIVER_HOST_ADDRESS)
        val port = userConf.get(DRIVER_PORT)
        registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

        val driverRef = rpcEnv.setupEndpointRef(
          RpcAddress(host, port),
          YarnSchedulerBackend.ENDPOINT_NAME)
        createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
      } else {
        // Sanity check; should never happen in normal operation, since sc should only be null
        // if the user app did not create a SparkContext.
        throw new IllegalStateException("User did not initialize spark context!")
      }
      // 前面资源分配好和Driver节点、Executor节点启动后,让Driver继续执行
      resumeDriver()
      userClassThread.join()
    } catch {
      case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
        logError(
          s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
           "Please check earlier log output for errors. Failing the application.")
        finish(FinalApplicationStatus.FAILED,
          ApplicationMaster.EXIT_SC_NOT_INITED,
          "Timed out waiting for SparkContext.")
    } finally {
      resumeDriver()
    }
  }

startUserApplication()需要执行完毕,后面代码才能继续执行,查看其中代码做了什么

scala
private def startUserApplication(): Thread = {
    logInfo("Starting the user application in a separate Thread")

    ..... 省略
    // 获取userClass, 也就是我们的应用程序主类
    val mainMethod = userClassLoader.loadClass(args.userClass)
      .getMethod("main", classOf[Array[String]])

    // 创建线程
    val userThread = new Thread {
      override def run(): Unit = {
        try {
            // run方法中判断userClass的main方法是否存在
          if (!Modifier.isStatic(mainMethod.getModifiers)) {
            logError(s"Could not find static main method in object ${args.userClass}")
            finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
          } else {
            // 执行run方法
            mainMethod.invoke(null, userArgs.toArray)
            finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
            logDebug("Done running user class")
          }
        } catch {
          ......
        } finally {
          sparkContextPromise.trySuccess(null)
        }
      }
    }
    userThread.setContextClassLoader(userClassLoader)
    // 线程名字叫Driver
    userThread.setName("Driver")
    // 启动线程
    userThread.start()
    userThread
  }

userClass其实就是最开始传入的--class的类名 Alt text 现在,Driver线程节点启动了。Driver会去执行我们编写的WordCount程序。

3. 继续执行Driver

回到runDriver()方法中,如果前面的代码都执行完毕,比如下一章的Executor也启动了,会执行下面resumeDriver()代码: Alt text