Skip to content

Spark SQL入门

1. Spark SQL概述

Alt textᅟᅠᅟᅠSpark SQL是Spark用于结构化数据(structured data)处理的Spark模块。Spark SQL的主要用来执行SQL查询。Spark SQL支持从现有的Hive中读取数据。交互方式有两种:编程语言中执行和命令行,在代码中执行SQL, 结果将作为Dataset/DataFrame返回;如果是通过命令行就是直接数据进行操作。

2. Spark SQL发展历史

ᅟᅠᅟᅠSpark SQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具。Hive是早期唯一运行在Hadoop上的SQL-on-Hadoop工具。但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的 I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:

  • Drill
  • Impala
  • Shark

ᅟᅠᅟᅠ其中Shark是伯克利实验室Spark生态环境的组件之一,是基于Hive所开发的工具,它修改了下图所示的右下角的内存管理、物理计划、执行三个模块,并使之能运行在Spark引擎上。下图就是Hive on Spark的早期雏形。
Alt textᅟᅠᅟᅠShark的出现,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高。随着Spark的发展,对于野心勃勃的Spark团队来说,Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了Spark的One Stack Rule Them All的既定方针,制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便,真可谓"退一步,海阔天空"。
ᅟᅠᅟᅠ2014年6月1日Shark项目和SparkSQL项目的主持人Reynold Xin宣布:停止对Shark的开发,团队将所有资源放Spark SQL项目上,至此Shark的发展画上了句话,但也因此发展出两个支线:SparkSQL和Hive on Spark。其中 Spark SQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是作为一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采Map-Reduce、Tez、Spark等引擎。时间来到2022年, Hive on Spark特性却开始被Hive官方移除,Hive官方不再推荐使用Spark作为Hive的计算引擎,你可以选择使用Hive 3.x版本,这是最后一个官方支持Spark作为计算引擎的Hive版本。Hive4.x只支持Tez作为计算引擎。请参考Hive4.0变更总览
对于开发人员来讲,Spark SQL可以简化RDD的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是Spark SQL。

3. Spark SQL特点

3.1 易整合

无缝的整合了SQL查询和Spark编程

3.2 统一的数据访问

使用相同的方式连接不同的数据源

3.3 兼容 Hive

在已有的仓库上直接运行SQL或者HiveQL

3.4 标准数据连接

通过JDBC或者ODBC来连接

4. DataFrame

4.1 DataFrame是什么

ᅟᅠᅟᅠ在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得 Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
ᅟᅠᅟᅠ同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。
Alt textᅟᅠᅟᅠ上图直观地体现了DataFrame和RDD的区别。左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。
ᅟᅠᅟᅠDataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待。DataFrame也是懒执行的,但性能上比RDD要高,主要原因:优化的执行计划,即查询计划通过Spark catalyst optimiser进行优化。比如下面一个例子: Alt textᅟᅠᅟᅠ为了说明查询优化,我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。

4.2 创建DataFrame

在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。

  1. 从Spark数据源进行创建
sh
[jack@hadoop103 ~]$ spark-shell
## 查看Spark支持创建文件的数据源格式(最后按TAB健进行提示)
scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

提示

如果从内存中获取数据,Spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换

5. DataSet

5.1 DataSet是什么

DataSet是分布式数据集合。DataSet是Spark 1.6中添加的一个新抽象,是DataFrame的一个扩展。它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。DataSet也可以使用功能性的转换(操作 map,flatMap,filter等等)
DataSet是DataFrame API的一个扩展,是Spark SQL最新的数据抽象
➢ 用户友好的API风格,既具有类型安全检查也具有DataFrame的查询优化特性;
➢ 用样例类来对DataSet中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称;
➢ DataSet是强类型的。比如可以有 DataSet[Car],DataSet[Person]。
➢ DataFrame是DataSet的特列,DataFrame=DataSet[Row] ,所以可以通过as方法将DataFrame转换为 DataSet。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息都用Row来表示。获取数据时需要指定顺序

5.2 创建DataSet

  1. 使用样例类序列创建DataSet
sh
scala> case class Person(name: String, age: Long)
defined class Person
scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]
scala> caseClassDS.show()
+--------+---+
|    name|age|
+--------+---+
|zhangsan|  2|
+--------+---+
  1. 使用基本类型的序列创建DataSet
sh
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+

提示

在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet