Spark 快速上手
1. 创建 Maven 项目
1.1 增加Scala插件
Spark由Scala语言开发的,笔记中开发所使用的语言也为Scala,当前使用的Spark版本为3.4.2,默认采用的Scala编译版本为2.12,IDEA开发工具中含有 Scala开发插件。
1.2 增加Maven依赖包
修改Maven项目中的POM文件,增加Spark框架的依赖关系
xml
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.4.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.8.1</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
1.3 配置项目Scala的SDK
访问Scala网址:https://www.scala-lang.org/download/2.12.19.html ,下载Scala安装包: 下载安装完成后,在Maven项目中配置SDK:
2. WordCount例子
创建Demo对象:
scala
object Demo {
def main(args: Array[String]): Unit = {
// 建立与Spark框架连接
val sprkConf = new SparkConf().setMaster("local").setAppName("WorkCount")
val sc = new SparkContext(sprkConf)
// 执行业务操作
// 1. 读取文件,获取一行一行数据
val lines: RDD[String] = sc.textFile("datas")
//将一行数据进行拆分,形成一个一个的单词
// 扁平化: 将整体拆分成个体的操作
val words: RDD[String] = lines.flatMap(_.split(" "))
// 转换数据结构 word => (word, 1)
val word2OneRDD: RDD[(String, Int)] = words.map((_, 1))
// 将转换结构后的数据按照相同的单词进行分组聚合
val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_ + _)
// 将数据聚合结果采集到内存中
val result: Array[(String, Int)] = word2CountRDD.collect()
// 打印结果
result.foreach(println)
sc.stop()
}
}
在src同级目录创建datas目录,在里面创建1.txt
txt
Hello Spark
Hello Scala
Hello Spark
Hello Spark
Hello Spark
Hello Scala
Hello Scala
执行过程中,会产生大量的执行日志,为了能够更好的查看程序的执行结果,可以在项目的resources目录中创建log4j2.properties文件,并添加日志配置信息。
ini
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy-MM-dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
运行结果: