Skip to content

RDD|DataFrame|DataSet三者关系

1. RDD、DataFrame、DataSet的发展

在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
➢ Spark1.0 => RDD
➢ Spark1.3 => DataFrame
➢ Spark1.6 => Dataset
在后期的Spark版本中,DataSet有可能会逐步取代 RDD和DataFrame成为唯一的API接口。

2. 三者的共性

  1. RDD、DataFrame、DataSet都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
  2. 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到行动算子如foreach时,三者才会开始遍历运算;
  3. 三者有许多共同的函数,如filter,排序等;
  4. 在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession 对象后尽量直接导入)
  5. 三者都会根据Spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
  6. 三者都有partition的概念
  7. DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型

3. 三者的区别

3.1 RDD

RDD不支持SparkSql操作

3.2 DataFrame

  1. 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值。
  2. DataFrame与DataSet均支持SparkSQL的操作,比如select, groupby之类,还能注册临时表/视窗,进行sql语句操作。
  3. DataFrame与DataSet支持一些特别方便的保存方式,比如保存成csv,可以带上表头。

3.3 DataSet

  1. Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame其实就是 DataSet 的一个特例, 源码中使用别名:type DataFrame = Dataset[Row]
  2. DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析的话,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者使用模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了 case class之后可以很自由的获得每一行的信息。

3.4 三者的互相转换

Alt text

4. RDD转换为DataFrame

使用toDF函数实现转化

sh
scala> val idRDD = sc.textFile("data/id.txt")
scala> idRDD.toDF("id").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+

实际开发中,一般通过样例类将RDD转换为DataFrame

sh
scala> case class User(name:String, age:Int)
defined class User
scala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1,
     | t._2)).toDF.show()
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 30|
|    lisi| 40|
+--------+---+

5. DataFrame转换为RDD

DataFrame其实就是对RDD的封装,所以可以直接获取内部的RDD

sh
scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, 
t._2)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46] 
at rdd at <console>:25
scala> rdd.foreach(println)
[zhangsan,30]
[lisi,40]

6. RDD转换为DataSet

SparkSQL能够自动将包含有case类的RDD转换成DataSet, case类定义了table的结构, case类属性通过反射变成了表的列名。case类可以包含诸如Seq或者Array等复杂的结构。

sh
scala> val ds1 = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
ds1: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> ds1.show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 30|
|    lisi| 49|
+--------+---+

7. DataSet转换为RDD

sh
scala> val rdd = ds1.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at 
<console>:25

8. DataFrame转换为DataSet

sh
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

9. DataSet转换为DataFrame

sh
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]