RDD|DataFrame|DataSet三者关系
1. RDD、DataFrame、DataSet的发展
在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
➢ Spark1.0 => RDD
➢ Spark1.3 => DataFrame
➢ Spark1.6 => Dataset
在后期的Spark版本中,DataSet有可能会逐步取代 RDD和DataFrame成为唯一的API接口。
2. 三者的共性
- RDD、DataFrame、DataSet都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
- 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到行动算子如foreach时,三者才会开始遍历运算;
- 三者有许多共同的函数,如filter,排序等;
- 在对DataFrame和Dataset进行操作许多操作都需要这个包:
import spark.implicits._
(在创建好SparkSession 对象后尽量直接导入) - 三者都会根据Spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
- 三者都有partition的概念
- DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型
3. 三者的区别
3.1 RDD
RDD不支持SparkSql操作
3.2 DataFrame
- 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值。
- DataFrame与DataSet均支持SparkSQL的操作,比如select, groupby之类,还能注册临时表/视窗,进行sql语句操作。
- DataFrame与DataSet支持一些特别方便的保存方式,比如保存成csv,可以带上表头。
3.3 DataSet
- Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame其实就是 DataSet 的一个特例, 源码中使用别名:
type DataFrame = Dataset[Row]
- DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析的话,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者使用模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了 case class之后可以很自由的获得每一行的信息。
3.4 三者的互相转换
4. RDD转换为DataFrame
使用toDF函数实现转化
sh
scala> val idRDD = sc.textFile("data/id.txt")
scala> idRDD.toDF("id").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+
实际开发中,一般通过样例类将RDD转换为DataFrame
sh
scala> case class User(name:String, age:Int)
defined class User
scala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1,
| t._2)).toDF.show()
+--------+---+
| name|age|
+--------+---+
|zhangsan| 30|
| lisi| 40|
+--------+---+
5. DataFrame转换为RDD
DataFrame其实就是对RDD的封装,所以可以直接获取内部的RDD
sh
scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1,
t._2)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46]
at rdd at <console>:25
scala> rdd.foreach(println)
[zhangsan,30]
[lisi,40]
6. RDD转换为DataSet
SparkSQL能够自动将包含有case类的RDD转换成DataSet, case类定义了table的结构, case类属性通过反射变成了表的列名。case类可以包含诸如Seq或者Array等复杂的结构。
sh
scala> val ds1 = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
ds1: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
scala> ds1.show
+--------+---+
| name|age|
+--------+---+
|zhangsan| 30|
| lisi| 49|
+--------+---+
7. DataSet转换为RDD
sh
scala> val rdd = ds1.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at
<console>:25
8. DataFrame转换为DataSet
sh
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
9. DataSet转换为DataFrame
sh
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]