Skip to content

RDD 持久化

1. RDD Cache缓存

Alt text 由于RDD中不存储数据,如果一个RDD需要重复使用,那么需要从头再次执行来获取数据,也就是RDD对象可以重用的,但是数据无法重用,这时可以利用RDD提供的缓存功能。
RDD 通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

1.1 数据缓存方法

  1. cache() 保存在内存
  2. persist()或者persist(StorageLevel) 默认保存在内存,或者指定保存级别进行存储。
    查看源码可以看到cache()实际也是调用的persist():
scala
def cache(): this.type = persist()
// persist() 保存到内存中
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

1.2 存储级别

Alt text 比如StorageLevel.DISK_ONLY_3表示直接保存到磁盘,并保存三份副本。

scala
def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.makeRDD(List(1, 3, 5, 4, 2, 4, 3), 2)
    val rdd2: RDD[(Int, Int)] = rdd1.map(it=>{
        println("@@@@@@@")
        (it, 1)
    })
    rdd2.cache()  // 默认只能在内存中保存,如果内存满了也不会溢写到磁盘
    println(rdd2.toDebugString)
    val rdd3: RDD[(Int, Int)] = rdd2.reduceByKey(_ + _)
    // 缓存数据并不会马上执行,而是有行动算子触发
    rdd3.collect().foreach(println)
    println("***********************")
    val rdd4: RDD[(Int, Int)] = rdd2.groupByKey().map((k) => {
        (k._1, k._2.size)
    })
    println(rdd2.toDebugString)
    rdd4.collect().foreach(println)
    sc.stop()
}

运行结果:
Alt text 可以看出RDD的之间的血缘关系中会添加新的缓存关系。
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

2. RDD CheckPoint检查点

所谓的检查点其实就是通过将RDD中间结果写入磁盘,由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

scala
def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    sc.setCheckpointDir("checkpoint")
    val rdd1 = sc.makeRDD(List(1, 3, 5, 4, 2, 4, 3), 2)
    val rdd2: RDD[(Int, Int)] = rdd1.map(it => {
        println("@@@@@@@")
        (it, 1)
    })
    rdd2.checkpoint() // 设置检查点,任务执行完不会被自动删除
    val rdd3: RDD[(Int, Int)] = rdd2.reduceByKey(_ + _)
    rdd3.collect().foreach(println)
    println("***********************")
    val rdd4: RDD[(Int, Int)] = rdd2.groupByKey().map((k) => {
        (k._1, k._2.size)
    })
    rdd4.collect().foreach(println)
    sc.stop()
}

运行结果:
Alt text 会发现使用检查点,"@@@@@@@"打印变多了,并不是检查点缓存不可用,而是因为检查点缓存需要单独开启一个task写到磁盘中。此外checkpoint会切断之前的血缘关系,重新建立血缘关系。
Alt text 查看checkpoint源码:

scala
// 从collect()行动算子进入 
rdd4.collect()

// 进入runJob() 
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }
//  SparkContext.scala 进入多层代码直到进入doCheckpoint()
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
    ......
    rdd.doCheckpoint()
}        

// RDD.scala  因为我们代码写了checkpoint,所以checkpointData.isDefined为true, 进入checkpointData.get.checkpoint()方法中
  private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          ......
          checkpointData.get.checkpoint()
        } 
        ......
  }
//  RDDCheckpointData.scala  进入doCheckpoint()方法
  final def checkpoint(): Unit = {
    ......
    val newRDD = doCheckpoint()
    ......
  }
// RDDCheckpointData.scala  doCheckpoint()是一个抽象方法,Ctrl+h选择实现类LocalRDDCheckpointData.scala进入
 protected def doCheckpoint(): CheckpointRDD[T]    
// LocalRDDCheckpointData.scala  可以看到执行runJob()方法, 检查点单独作为任务执行
protected override def doCheckpoint(): CheckpointRDD[T] = {
    val level = rdd.getStorageLevel
    if (missingPartitionIndices.nonEmpty) {
      rdd.sparkContext.runJob(rdd, action, missingPartitionIndices)
    }
    new LocalCheckpointRDD[T](rdd)
}

2.1 缓存和检查点区别

  1. Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
  2. Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
  3. 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。