Skip to content

Spark内核源码之SparkSubmit

1. Spark运行流程总览

Alt text 这个流程是按照如下的核心步骤进行工作的:

  1. 任务提交后,都会先启动 Driver程序;
  2. 随后Driver向集群管理器注册应用程序;
  3. 之后集群管理器根据此任务的配置文件分配Executor并启动;
  4. Driver开始执行main函数,Spark查询为懒执行,当执行到 Action 算子时开始反向推算,根据宽依赖进行Stage的划分,随后每一个Stage对应一个Taskset,Taskset中有多个Task,查找可用资源Executor进行调度;
  5. 根据本地化原则,Task会被分发到指定的Executor去执行,在任务执行的过程中,Executor也会不断与Driver进行通信,报告任务运行情况。

2. 作业提交入口

  1. spark-shell脚本中: Alt text
  2. spark-submit脚本中: Alt text 可以看到程序入口是org.apache.spark.deploy.SparkSubmit类中的伴生对象 Alt text 程序解析输入的命令行参数:submit.doSubmit(args):
scala
def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)
    // 解析命令行参数
    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    // 模式匹配执行Spark的行为
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }

3. 命令参数解析

在SparkSubmit.scala中98行,封装SparkSubmitArguments对象,里面对参数解析

scala
protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
    new SparkSubmitArguments(args)
}

SparkSubmitArguments.scala中第110行负责解析参数 Alt text 里面使用正则表达式匹配解析得到参数 Alt text 参数最后交给SparkSubmitArguments.scala进行封装 Alt text 当然也要解析Action参数, 如果没有Action默认就是Submit: Alt text

3. 按照Action参数程序执行

回到SparkSubmit.scala中程序会判断submit会执行submit()方法: Alt text 执行doRunMain()方法, 由于没有传代理用户参数,走else执行runMain():

scala
def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        ......
      } else {
        // 执行主函数
        runMain(args, uninitLog)
      }
    }

在SparkSubmit.scala第954行runMain()方法中,先做提交环境准备:

scala
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    // 提交环境准备,其中childMainClass会在代码后面生成构造对象实例
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    ......

    var mainClass: Class[_] = null

    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      ......
    }
    // 如果是继承SparkApplication则通过反射,否则通过new方法创建对象app
    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
      new JavaMainApplication(mainClass)
    }

    ......

    try {
      app.start(childArgs.toArray, sparkConf)
    } catch {
      case t: Throwable =>
        throw findCause(t)
    } finally {
      if (args.master.startsWith("k8s") && !isShell(args.primaryResource) &&
          !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) &&
          !isConnectServer(args.mainClass)) {
        try {
          SparkContext.getActive.foreach(_.stop())
        } catch {
          case e: Throwable => logError(s"Failed to close SparkContext: $e")
        }
      }
    }
  }

childMainClass在yarn环境中,默认会是org.apache.spark.deploy.yarn.YarnClusterApplication Alt text 添加yarn依赖

xml
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-yarn_2.12</artifactId>
    <version>3.4.2</version>
</dependency>

在Client.scala中可以看到YarnClusterApplication是继承了SparkApplication的,里面定义了start方法 Alt text

4. 执行start方法

start()方法中创建了Client对象, 进入源码Client类中发现有yarnClient属性: Alt text
底层使用的YarnClientImpl类

java
public static YarnClient createYarnClient() {
    YarnClient client = new YarnClientImpl();
    return client;
}

其中rmClient就是资源调度节点ResourceMananger节点的客户端对象 Alt text 回到Client.scala中,Client类执行run()方法

scala
def run(): Unit = {
    // 任务提交
    submitApplication()
    // 省略任务监控代码
    .......
  }

在submitApplication()方法中

scala
def submitApplication(): Unit = {
    ResourceRequestHelper.validateResources(sparkConf)

    try {
      launcherBackend.connect()
      yarnClient.init(hadoopConf)
      // yarn客户端启动,和yarn集群建立连接
      yarnClient.start()

      // 向yarn请求创建容器
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      // 创建好之后会得到全局唯一id
      this.appId = newAppResponse.getApplicationId()
      ......

      // 创建启动容器上下文,里面包含命令行参数
      val containerContext = createContainerLaunchContext()
      // 创建应用上下文环境,生成应用名称,优先级,内存大小等信息
      val appContext = createApplicationSubmissionContext(newApp, containerContext)

      // Finally, submit and monitor the application
      logInfo(s"Submitting application $appId to ResourceManager")
      // 提交容器上下文环境
      yarnClient.submitApplication(appContext)
      launcherBackend.setAppId(appId.toString)
      reportLauncherState(SparkAppHandle.State.SUBMITTED)
    } catch {
      ......
  }

进入createContainerLaunchContext()方法 Alt text