SparkSQL的基本使用
1. SparkSession简介
Spark SQL使用全新的上下文对象SparkSession,SparkSession内部封装了SparkContext,当我们使用spark-shell的时候, Spark框架会自动的创建一个名称叫做spark的SparkSession对象, 就像我们以前可以自动获取到一个sc来表示SparkContext对象一样。
2. SQL语法
在data目录下创建user.json文件,需要注意的是Spark读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串。
sh
# 读取JSON文件创建DataFrame
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
# 对DataFrame创建一个临时表
scala> df.createOrReplaceTempView("people")
# 通过SQL语句实现查询全表
scala> val sqlDF = spark.sql("SELECT * FROM people").show
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+
普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people
sh
# 对于DataFrame创建一个全局表
scala> df.createGlobalTempView("people")
# 通过SQL语句实现查询全表
scala> spark.sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+
# 开启新的会话
scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+
3. DSL语法
DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。
sh
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
# 查看DataFrame的Schema信息
scala> df.printSchema
root
|-- age: Long (nullable = true)
|-- username: string (nullable = true)
# 只查看"username"列数据
scala> df.select("username").show()
+--------+
|username|
+--------+
|zhangsan|
| lisi|
| wangwu|
+--------+
涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
sh
# 查看"username"列数据以及"age+1"数据
scala> df.select('username, 'age + 1 as "newage").show()
+---------+------+
| username|newage|
+---------+------+
|zhangshan| 21|
| lisi| 27|
| zhaoliu| 23|
+---------+------+
# 按照"age"分组,查看数据条数
scala> df.groupBy("age").count.show()
+---+-----+
|age|count|
+---+-----+
| 26| 1|
| 22| 1|
| 18| 1|
| 20| 1|
+---+-----+