RDD 依赖关系
1. RDD血缘关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。 打印每个RDD记录的血缘关系:
scala
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(List(1,3,5,4,2), 2)
println(rdd1.toDebugString)
println("--------------------------------------")
val rdd2: RDD[(Int, Int)] = rdd1.map((_, 1))
println(rdd2.toDebugString)
println("--------------------------------------")
val rdd3: RDD[(Int, Int)] = rdd2.reduceByKey(_ + _)
println(rdd3.toDebugString)
println("--------------------------------------")
sc.stop()
}
运行结果:
2. RDD依赖关系
所谓的依赖关系,其实就是两个相邻RDD之间的关系。
查看每个RDD的依赖关系:
scala
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(List(1,3,5,4,2), 2)
println(rdd1.dependencies)
println("--------------------------------------")
val rdd2: RDD[(Int, Int)] = rdd1.map((_, 1))
println(rdd2.dependencies)
println("--------------------------------------")
val rdd3: RDD[(Int, Int)] = rdd2.reduceByKey(_ + _)
println(rdd3.dependencies)
println("--------------------------------------")
sc.stop()
}
运行结果:
3. RDD窄依赖
窄依赖表示每一个父(上游)RDD的Partition最多被子(下游)RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。
4. RDD宽依赖
宽依赖表示同一个父(上游)RDD的Partition被多个子(下游)RDD的Partition依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为多生。
scala
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]]
5. RDD阶段划分
对于宽依赖的RDD, 由于会引起Shuffle, 划分Shuffle前后阶段可以利于RDD的Task进行执行。
5.1 RDD阶段划分源码解析
scala
// 从collect方法进入
rdd3.collect()
// RDD.scala 进入runJob()方法
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
// SparkContext.scala 进入runJob()方法
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, rdd.partitions.indices) // 套了一层
}
// SparkContext.scala 进入runJob()方法
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
// SparkContext.scala 进入runJob()方法
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
// SparkContext.scala 进入dagScheduler.runJob()
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
.......
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
// DAGScheduler.scala 进入submitJob()方法
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
......
}
// DAGScheduler.scala eventProcessLoop.post()方法发送JobSubmitted事件,找到处理JobSubmitted事件方法
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
val maxPartitions = rdd.partitions.length
......
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
waiter
}
// DAGScheduler.scala createResultStage()就是RDD阶段创建的方法, 进入createResultStage()
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
......
}
......
}
// DAGScheduler.scala getOrCreateParentStages()得到上一阶段RDD并创建阶段
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd, resourceProfile)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(shuffleDeps, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
callSite, resourceProfile.id)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
流程可以如图所示: 总结如下:
- 当RDD中存在shuffle依赖时,阶段会自动增加一个。
- 阶段的数量 = shuffle依赖的数量 + 1。
- ResultStage只有一个,它是最后需要执行的阶段。
6. RDD任务划分
RDD任务切分中间分为:Application、Job、Stage和Task
- Application:应用,初始化一个SparkContext即生成一个Application;
- Job:一个Action算子(也就是行动算子)就会生成一个Job;
- Stage:Stage(也就是阶段)等于宽依赖(ShuffleDependency)的个数加 1;
- Task:一个Stage阶段中,最后一个RDD的分区个数就是当前阶段Task的个数。
提示
Application->Job->Stage->Task每一层都是1对n的关系。
6.1 RDD任务划分源码
scala
// DAGScheduler.scala handleJobSubmitted()方法中最后有个submitStage()方法, 进入submitStage()
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
var finalStage: ResultStage = null
......
submitStage(finalStage)
}
// DAGScheduler.scala 最后的阶段没有父阶段,所以missing为空, 进入submitMissingTasks()方法
private def submitStage(stage: Stage): Unit = {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug(s"submitStage($stage (name=${stage.name};" +
s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
......
}
// DAGScheduler.scala 最后一个阶段为ResultStage,模式匹配进入第二个, 其中map执行并不改变元素的个数,也就是new ResultTask的个数和partitionsToCompute有关, 进入partitionsToCompute
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
// 任务序列
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary,
part, stage.numPartitions, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, stage.numPartitions, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
}
......
}
// DAGScheduler.scala 进入findMissingPartitions()方法发现是接口,因为这里stage就是ResultStage,进入ResultStage类中的findMissingPartitions()方法
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// ResultStage.scala rdd的分区数量决定了任务数量
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
如图所示: 任务的数量 = 当前阶段中最后一个RDD的分区数量