源码之ApplicationMaster
根据上文yar集群会组装成命令并执行${JAVA_HOME}/bin/java -server org.apache.spark.deploy.yarn.ApplicationMaster 命令参数...
1. 进程执行ApplicationMaster
查看ApplicationMaster源码, main入口:
scala
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
// 解析命令行参数
val amArgs = new ApplicationMasterArguments(args)
val sparkConf = new SparkConf()
if (amArgs.propertiesFile != null) {
Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
sparkConf.set(k, v)
}
}
......
val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
// 根据参数配置信息创建AM
master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
......一些用户身份信息验证的逻辑
ugi.doAs(new PrivilegedExceptionAction[Unit]() {
// 执行run方法
override def run(): Unit = System.exit(master.run())
})
}
查看ApplicationMaster.scala,可以看到它有RM通信的客户端,就是它负责和RM连接信息的: 回到ApplicationMaster的main方法,最后执行run方法:
2. 启动Driver线程
在runDriver()方法中
scala
private def runDriver(): Unit = {
addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
// 启动用户的应用程序,比如wordCount程序
userClassThread = startUserApplication()
......
try {
// 线程工具类调用阻塞的方法,等待执行结果
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
val rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf
val host = userConf.get(DRIVER_HOST_ADDRESS)
val port = userConf.get(DRIVER_PORT)
registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
throw new IllegalStateException("User did not initialize spark context!")
}
// 前面资源分配好和Driver节点、Executor节点启动后,让Driver继续执行
resumeDriver()
userClassThread.join()
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
logError(
s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
"Please check earlier log output for errors. Failing the application.")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} finally {
resumeDriver()
}
}
startUserApplication()需要执行完毕,后面代码才能继续执行,查看其中代码做了什么
scala
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
..... 省略
// 获取userClass, 也就是我们的应用程序主类
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])
// 创建线程
val userThread = new Thread {
override def run(): Unit = {
try {
// run方法中判断userClass的main方法是否存在
if (!Modifier.isStatic(mainMethod.getModifiers)) {
logError(s"Could not find static main method in object ${args.userClass}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
// 执行run方法
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running user class")
}
} catch {
......
} finally {
sparkContextPromise.trySuccess(null)
}
}
}
userThread.setContextClassLoader(userClassLoader)
// 线程名字叫Driver
userThread.setName("Driver")
// 启动线程
userThread.start()
userThread
}
userClass其实就是最开始传入的--class的类名 现在,Driver线程节点启动了。Driver会去执行我们编写的WordCount程序。
3. 继续执行Driver
回到runDriver()方法中,如果前面的代码都执行完毕,比如下一章的Executor也启动了,会执行下面resumeDriver()代码: