RDD分区器
Spark目前支持Hash分区和Range分区,和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过 Shuffle后进入哪个分区,进而决定了Reduce的个数。
- 只有Key-Value类型的 RDD才有分区器,非Key-Value类型的RDD分区的值是None。
- 每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
1. Hash分区
对于给定的 key,计算其 hashCode,并除以分区个数取余。
2. Range分区
将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序。
3. 自定义分区器
scala
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MyPartitioner")
val sc = new SparkContext(sparkConf)
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("java", 80), ("android", 30), ("vue3", 60), ("scala", 20),
("spark", 30), ("mysql", 60), ("spring", 80), ("javascript", 70)), 3)
val rdd2: RDD[(String, Int)] = rdd1.partitionBy(new MyPartitioner(3))
rdd2.saveAsTextFile("output/result")
sc.stop()
}
class MyPartitioner(num: Int) extends Partitioner{
override def numPartitions: Int = num
override def getPartition(key: Any): Int = {
// 使用模式匹配
key.toString.charAt(0) match {
case 's' => 0
case 'j' => 1
case _ => 2
}
}
}
运行结果: