Skip to content

RDD 依赖关系

1. RDD血缘关系

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
Alt text 打印每个RDD记录的血缘关系:

scala
def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.makeRDD(List(1,3,5,4,2),  2)
    println(rdd1.toDebugString)
    println("--------------------------------------")
    val rdd2: RDD[(Int, Int)] = rdd1.map((_, 1))
    println(rdd2.toDebugString)
    println("--------------------------------------")
    val rdd3: RDD[(Int, Int)] = rdd2.reduceByKey(_ + _)
    println(rdd3.toDebugString)
    println("--------------------------------------")
    sc.stop()
}

运行结果:
Alt text

2. RDD依赖关系

所谓的依赖关系,其实就是两个相邻RDD之间的关系。
查看每个RDD的依赖关系:

scala
def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.makeRDD(List(1,3,5,4,2),  2)
    println(rdd1.dependencies)
    println("--------------------------------------")
    val rdd2: RDD[(Int, Int)] = rdd1.map((_, 1))
    println(rdd2.dependencies)
    println("--------------------------------------")
    val rdd3: RDD[(Int, Int)] = rdd2.reduceByKey(_ + _)
    println(rdd3.dependencies)
    println("--------------------------------------")
    sc.stop()
}

运行结果: RDD依赖关系图

3. RDD窄依赖

窄依赖表示每一个父(上游)RDD的Partition最多被子(下游)RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。 Alt text

4. RDD宽依赖

宽依赖表示同一个父(上游)RDD的Partition被多个子(下游)RDD的Partition依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为多生。
Alt text

scala
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
    extends Dependency[Product2[K, V]]

5. RDD阶段划分

对于宽依赖的RDD, 由于会引起Shuffle, 划分Shuffle前后阶段可以利于RDD的Task进行执行。
Alt text

5.1 RDD阶段划分源码解析

scala
// 从collect方法进入
rdd3.collect()

// RDD.scala 进入runJob()方法
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
// SparkContext.scala 进入runJob()方法
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, rdd.partitions.indices)  // 套了一层
}
// SparkContext.scala 进入runJob()方法
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: Iterator[T] => U,
    partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
// SparkContext.scala 进入runJob()方法
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
// SparkContext.scala  进入dagScheduler.runJob()
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
.......
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
// DAGScheduler.scala 进入submitJob()方法
def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
......
}
// DAGScheduler.scala  eventProcessLoop.post()方法发送JobSubmitted事件,找到处理JobSubmitted事件方法
def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
val maxPartitions = rdd.partitions.length
......
eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    Utils.cloneProperties(properties)))
waiter
}
// DAGScheduler.scala  createResultStage()就是RDD阶段创建的方法, 进入createResultStage()
private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties): Unit = {
    var finalStage: ResultStage = null
    try {
        // New stage creation may throw an exception if, for example, jobs are run on a
        // HadoopRDD whose underlying HDFS files have been deleted.
        finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
        ......
    }
    ......
}
// DAGScheduler.scala  getOrCreateParentStages()得到上一阶段RDD并创建阶段
private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd, resourceProfile)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(shuffleDeps, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
    callSite, resourceProfile.id)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

流程可以如图所示:
Alt text 总结如下:

  • 当RDD中存在shuffle依赖时,阶段会自动增加一个。
  • 阶段的数量 = shuffle依赖的数量 + 1。
  • ResultStage只有一个,它是最后需要执行的阶段。

6. RDD任务划分

RDD任务切分中间分为:Application、Job、Stage和Task

  • Application:应用,初始化一个SparkContext即生成一个Application;
  • Job:一个Action算子(也就是行动算子)就会生成一个Job;
  • Stage:Stage(也就是阶段)等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个Stage阶段中,最后一个RDD的分区个数就是当前阶段Task的个数。

提示

Application->Job->Stage->Task每一层都是1对n的关系。

6.1 RDD任务划分源码

scala
// DAGScheduler.scala  handleJobSubmitted()方法中最后有个submitStage()方法, 进入submitStage()
private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties): Unit = {
    var finalStage: ResultStage = null
    ......
    submitStage(finalStage)
}
// DAGScheduler.scala  最后的阶段没有父阶段,所以missing为空, 进入submitMissingTasks()方法
private def submitStage(stage: Stage): Unit = {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
    logDebug(s"submitStage($stage (name=${stage.name};" +
    s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
    val missing = getMissingParentStages(stage).sortBy(_.id)
    logDebug("missing: " + missing)
    if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)
    } else {
        for (parent <- missing) {
        submitStage(parent)
        }
        waitingStages += stage
    }
    }
......
}   
// DAGScheduler.scala  最后一个阶段为ResultStage,模式匹配进入第二个, 其中map执行并不改变元素的个数,也就是new ResultTask的个数和partitionsToCompute有关, 进入partitionsToCompute   
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
    // 任务序列
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary,
              part, stage.numPartitions, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }
        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, stage.numPartitions, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    }
    ......
}
// DAGScheduler.scala  进入findMissingPartitions()方法发现是接口,因为这里stage就是ResultStage,进入ResultStage类中的findMissingPartitions()方法
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

// ResultStage.scala  rdd的分区数量决定了任务数量 
  override def findMissingPartitions(): Seq[Int] = {
    val job = activeJob.get
    (0 until job.numPartitions).filter(id => !job.finished(id))
  }

如图所示:
Alt text 任务的数量 = 当前阶段中最后一个RDD的分区数量