Skip to content

RDD分区器

Spark目前支持Hash分区和Range分区,和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过 Shuffle后进入哪个分区,进而决定了Reduce的个数。

  • 只有Key-Value类型的 RDD才有分区器,非Key-Value类型的RDD分区的值是None。
  • 每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

1. Hash分区

对于给定的 key,计算其 hashCode,并除以分区个数取余。

2. Range分区

将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序。

3. 自定义分区器

scala
def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MyPartitioner")
    val sc = new SparkContext(sparkConf)
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("java", 80), ("android", 30), ("vue3", 60), ("scala", 20),
        ("spark", 30), ("mysql", 60), ("spring", 80), ("javascript", 70)), 3)
    val rdd2: RDD[(String, Int)] = rdd1.partitionBy(new MyPartitioner(3))
    rdd2.saveAsTextFile("output/result")
    sc.stop()
}
class MyPartitioner(num: Int) extends Partitioner{
    override def numPartitions: Int = num

    override def getPartition(key: Any): Int = {
        // 使用模式匹配
        key.toString.charAt(0) match {
            case 's' => 0
            case 'j' => 1
            case _ => 2
        }
    }
}

运行结果:
Alt text