Skip to content

SparkStreaming实操

1. 环境和数据准备

1. 添加依赖

xml
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.4.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.4.2</version>
</dependency>
<dependency>
    <groupId>com.zaxxer</groupId>
    <artifactId>HikariCP</artifactId>
    <version>6.1.0</version>
</dependency>

1.2 工具类

编写配置文件读取工具:

scala
object PropertiesUtil {
    def load(propFile: String): Properties = {
        val prop = new Properties()
        val streamReader =
            new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propFile), "UTF-8")
        prop.load(streamReader)
        prop
    }
}

1.3 配置文件

config.properties:

ini
#jdbc 配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://hadoop102:3306/spark2024?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&useSSL=false
jdbc.user=root
jdbc.password=123456
# Kafka 配置
kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092

1.4 模拟生成数据

scala
/**
 * 城市信息表
 * @param city_id 城市 id
 * @param city_name 城市名称
 * @param area 城市所在大区
 */
case class CityInfo (city_id:Long,
                     city_name:String,
                     area:String)
scala
case class RanOpt[T](value: T, weight: Int)

object RandomOptions {
    def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {
        val randomOptions = new RandomOptions[T]()
        for (opt <- opts) {
            randomOptions.totalWeight += opt.weight
            for (i <- 1 to opt.weight) {
                randomOptions.optsBuffer += opt.value
            }
        }
        randomOptions
    }
}

class RandomOptions[T](opts: RanOpt[T]*) {
    var totalWeight = 0
    var optsBuffer = new ListBuffer[T]

    def getRandomOpt: T = {
        val randomNum: Int = new Random().nextInt(totalWeight)
        optsBuffer(randomNum)
    }
}
scala
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

object MockerRealTime {
    /**
     * 模拟的数据
     * 格式 :timestamp area city userid adid
     * 某个时间点 某个地区 某个城市 某个用户 某个广告
     */
    def generateMockData(): Array[String] = {
        val array: ArrayBuffer[String] = ArrayBuffer[String]()
        val CityRandomOpt = RandomOptions(RanOpt(CityInfo(1, "北京", "华北"), 30),
            RanOpt(CityInfo(2, "上海", "华东"), 30),
            RanOpt(CityInfo(3, "广州", "华南"), 10),
            RanOpt(CityInfo(4, "深圳", "华南"), 20),
            RanOpt(CityInfo(5, "天津", "华北"), 10))
        val random = new Random()
        // 模拟实时数据:
        // timestamp province city userid adid
        for (i <- 0 to 50) {
            val timestamp: Long = System.currentTimeMillis()
            val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
            val city: String = cityInfo.city_name
            val area: String = cityInfo.area
            val adid: Int = 1 + random.nextInt(6)
            val userid: Int = 1 + random.nextInt(6)
            // 拼接实时数据
            array += timestamp + " " + area + " " + city + " " + userid + " " + adid
        }
        array.toArray
    }
    def createKafkaProducer(broker: String): KafkaProducer[String, String] = {
        // 创建配置对象
        val prop = new Properties()
        // 添加配置
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
        // 根据配置创建 Kafka 生产者
        new KafkaProducer[String, String](prop)
    }

    def main(args: Array[String]): Unit = {
        // 获取配置文件 config.properties 中的 Kafka 配置参数
        val config: Properties = PropertiesUtil.load("config.properties")
        val broker: String = config.getProperty("kafka.broker.list")
        val topic = "test"
        // 创建 Kafka 消费者
        val kafkaProducer: KafkaProducer[String, String] = createKafkaProducer(broker)
        while (true) {
            // 随机产生实时数据并通过 Kafka 生产者发送到 Kafka 集群中
            for (line <- generateMockData()) {
                kafkaProducer.send(new ProducerRecord[String, String](topic, line))
                println(line)
            }
            Thread.sleep(2000)
        }
    }
}

2. 需求一广告黑名单

实现实时的动态黑名单机制:将每天对某个广告点击超过100次的用户拉黑。
注:黑名单保存到MySQL中。

2.1 思路分析

Alt text

  1. 读取Kafka数据之后,并对MySQL中存储的黑名单数据做校验;
  2. 校验通过则对给用户点击广告次数累加一并存入MySQL;
  3. 在存入MySQL之后对数据做校验,如果单日超过100次则将该用户加入黑名单。

2.1 MySQL建表

sql
-- 存放黑名单用户的表
CREATE TABLE black_list (userid CHAR(1) PRIMARY KEY);
-- 存放单日各用户点击每个广告的次数
CREATE TABLE user_ad_count (
    dt varchar(255),
    userid CHAR (1),
    adid CHAR (1),
    count BIGINT,
PRIMARY KEY (dt, userid, adid)
);

2.2 编写Kafka和MySQL工具类

创建一个SparkStreaming读取Kafka数据和操作MySQL的工具类。

scala
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

import java.util.Properties

object MyKafkaUtil {
    //1.创建配置信息对象
    private val properties: Properties = PropertiesUtil.load("config.properties")
    //2.用于初始化链接到集群的地址
    val broker_list: String = properties.getProperty("kafka.broker.list")
    //3.kafka消费者配置
    val kafkaParam = Map(
        "bootstrap.servers" -> broker_list,
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        //消费者组
        "group.id" -> "commerce-consumer-group",
        //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
        //可以使用这个配置,latest 自动重置偏移量为最新的偏移量
        "auto.offset.reset" -> "latest",
        //如果是 true,则这个消费者的偏移量会在后台自动提交,但是Kafka宕机容易丢失数据
        //如果是 false,会需要手动维护 kafka 偏移量
        "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    // 创建DStream,返回接收到的输入数据
    // LocationStrategies:根据给定的主题和集群地址创建 consumer
    // LocationStrategies.PreferConsistent:持续的在所有 Executor 之间分配分区
    // ConsumerStrategies:选择如何在 Driver 和 Executor 上创建和配置 Kafka Consumer
    // ConsumerStrategies.Subscribe:订阅一系列主题
    def getKafkaStream(topic: String, ssc: StreamingContext):
    InputDStream[ConsumerRecord[String, String]] = {
        val dStream: InputDStream[ConsumerRecord[String, String]] =
            KafkaUtils.createDirectStream[String, String](ssc,
                LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,
                    String](Array(topic), kafkaParam))
        dStream
    }
}
scala
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}

import java.sql.{Connection, PreparedStatement, ResultSet}
import java.util.Properties
import javax.sql.DataSource

object JdbcUtil {
    //初始化连接池
    var dataSource: DataSource = init()

    //初始化连接池方法
    def init(): DataSource = {
        val properties = new Properties()
        val config: Properties = PropertiesUtil.load("config.properties")
        properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
        properties.setProperty("url", config.getProperty("jdbc.url"))
        properties.setProperty("username", config.getProperty("jdbc.user"))
        properties.setProperty("password", config.getProperty("jdbc.password"))
        properties.setProperty("maxActive",
            config.getProperty("jdbc.datasource.size"))
        new HikariDataSource(new HikariConfig(properties))
    }

    //获取MySQL连接
    def getConnection: Connection = {
        dataSource.getConnection
    }

    //执行SQL语句,单条数据插入
    def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int
    = {
        var rtn = 0
        var pstmt: PreparedStatement = null
        try {
            connection.setAutoCommit(false)
            pstmt = connection.prepareStatement(sql)
            if (params != null && params.length > 0) {
                for (i <- params.indices) {
                    pstmt.setObject(i + 1, params(i))
                }
            }
            rtn = pstmt.executeUpdate()
            connection.commit()
            pstmt.close()
        } catch {
            case e: Exception => e.printStackTrace()
        }
        rtn
    }

    //执行SQL语句,批量数据插入
    def executeBatchUpdate(connection: Connection, sql: String, paramsList:
    Iterable[Array[Any]]): Array[Int] = {
        var rtn: Array[Int] = null
        var pstmt: PreparedStatement = null
        try {
            connection.setAutoCommit(false)
            pstmt = connection.prepareStatement(sql)
            for (params <- paramsList) {
                if (params != null && params.length > 0) {
                    for (i <- params.indices) {
                        pstmt.setObject(i + 1, params(i))
                    }
                    pstmt.addBatch()
                }
            }
            rtn = pstmt.executeBatch()
            connection.commit()
            pstmt.close()
        } catch {
            case e: Exception => e.printStackTrace()
        }
        rtn
    }

    //判断一条数据是否存在
    def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean = {
        var flag: Boolean = false
        var pstmt: PreparedStatement = null
        try {
            pstmt = connection.prepareStatement(sql)
            for (i <- params.indices) {
                pstmt.setObject(i + 1, params(i))
            }
            flag = pstmt.executeQuery().next()
            pstmt.close()
        } catch {
            case e: Exception => e.printStackTrace()
        }
        flag
    }

    //获取MySQL的一条数据
    def getDataFromMysql(connection: Connection, sql: String, params: Array[Any]):
    Long = {
        var result: Long = 0L
        var pstmt: PreparedStatement = null
        try {
            pstmt = connection.prepareStatement(sql)
            for (i <- params.indices) {
                pstmt.setObject(i + 1, params(i))
            }
            val resultSet: ResultSet = pstmt.executeQuery()
            while (resultSet.next()) {
                result = resultSet.getLong(1)
            }
            resultSet.close()
            pstmt.close()
        } catch {
            case e: Exception => e.printStackTrace()
        }
        result
    }
}

2.3 代码实现

scala
package com.rocket.spark.streaming.exercise

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date

object StreamingBlackList {
    def main(args: Array[String]): Unit = {
        //1.初始化Spark配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DStreamByRddQueue")
        val ssc = new StreamingContext(sparkConf, Seconds(5))

        //3.定义Kafka参数
        val kafkaParam: Map[String, Object] = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
                "hadoop102:9092,hadoop103:9092,hadoop104:9092",
            ConsumerConfig.GROUP_ID_CONFIG -> "jack",
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
            "key.deserializer" ->
                "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer" ->
                "org.apache.kafka.common.serialization.StringDeserializer"
        )

        //4.读取Kafka数据创建DStream
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
            KafkaUtils.createDirectStream[String, String](ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](Set("sparkAds"), kafkaParam))

        //5.将每条消息的KV取出
        val valueDStream: DStream[Ads_log] = kafkaDStream.map(
            record => {
                val lines: Array[String] = record.value().split(" ")
                Ads_log(lines(0).toLong, lines(1), lines(2), lines(3), lines(4))
            })
        // 根据 MySQL 中的黑名单过滤当前数据集
        val filterRdd: DStream[Ads_log] = valueDStream.transform(rdd => {
            rdd.filter(adsLog => {
                val connection: Connection = JdbcUtil.getConnection
                val sql =
                    """
                      | select * from black_list
                      | where userid=?
                      |""".stripMargin
                val bool = JdbcUtil.isExist(connection, sql, Array(adsLog.userid))
                connection.close()
                !bool
            })
        })

        // 将满足要求的用户写入黑名单
        val sdf = new SimpleDateFormat("yyyyMMdd")
        val clickCountRdd: DStream[((String, String, String), Int)] = filterRdd.map(adsLog => {
            val date: String = sdf.format(new Date(adsLog.timestamp))
            ((date, adsLog.userid, adsLog.adid), 1)
        }).reduceByKey(_ + _)
        // 统计单日每个用户点击每个广告的总次数
        clickCountRdd.foreachRDD(rdd => {
            rdd.foreachPartition(iter => {
                val connection: Connection = JdbcUtil.getConnection
                iter.foreach {
                    case ((dt, userid, adid), sum) =>
                        JdbcUtil.executeUpdate(connection,
                            """
                              |INSERT INTO user_ad_count (dt,userid,adid,count)
                              |VALUES (?,?,?,?)
                              |ON DUPLICATE KEY
                              |UPDATE count=count+?
                            """.stripMargin, Array(dt, userid, adid, sum, sum))

                        val ct: Long = JdbcUtil.getDataFromMysql(connection,
                            """select count from
                              | user_ad_count where dt=? and userid=? and adid =?
                            """.stripMargin, Array(dt, userid, adid))

                        if (ct >= 30) {
                            JdbcUtil.executeUpdate(connection,
                                """ INSERT INTO black_list (userid)
                                  | VALUES (?) ON DUPLICATE KEY update userid=?
                            """.stripMargin, Array(userid, userid))
                        }
                }
                connection.close()
            })
        })
        ssc.start()
        ssc.awaitTermination()
    }
}

2. 广告点击量实时统计

描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQL。

2.1 思路分析

  1. 单个批次内对数据进行按照天维度的聚合统计;
  2. 结合MySQL数据跟当前批次数据更新原有的数据。

2.2

  1. MySQL建表
sql
CREATE TABLE area_city_ad_count (
    dt VARCHAR(255),
    area VARCHAR(255),
    city VARCHAR(255),
    adid VARCHAR(255),
    count BIGINT,
    PRIMARY KEY (dt,area,city,adid)
);

2.3 代码实现

scala
package com.rocket.spark.streaming.exercise

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date

object StreamingDayCount {
    def main(args: Array[String]): Unit = {
        //1.初始化 Spark 配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DStreamByRddQueue")
        val ssc = new StreamingContext(sparkConf, Seconds(5))

        //3.定义Kafka参数
        val kafkaParam: Map[String, Object] = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
                "hadoop102:9092,hadoop103:9092,hadoop104:9092",
            ConsumerConfig.GROUP_ID_CONFIG -> "jack01",
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
            "key.deserializer" ->
                "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer" ->
                "org.apache.kafka.common.serialization.StringDeserializer"
        )

        //4.读取Kafka数据创建 DStream
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
            KafkaUtils.createDirectStream[String, String](ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](Set("sparkAds"), kafkaParam))

        //5.将每条消息的KV取出
        val valueDStream: DStream[((String, String, String, String), Int)] = kafkaDStream.map(
            record => {
                val lines: Array[String] = record.value().split(" ")
                val sdf = new SimpleDateFormat("yyyyMMdd")
                val date: String = sdf.format(new Date(lines(0).toLong))
                AreaCityAdCount(date, lines(1), lines(2), lines(4), 1L)
                ((date, lines(1), lines(2), lines(4)), 1)
            })

        val clickCountRdd: DStream[((String, String, String, String), Int)] = valueDStream.reduceByKey(_ + _)
        // 统计单日每个用户点击每个广告的总次数
        clickCountRdd.foreachRDD(rdd => {
            rdd.foreachPartition(iter => {
                val connection: Connection = JdbcUtil.getConnection
                iter.foreach {
                    case ((dt, area, city, adid), sum) =>
                        JdbcUtil.executeUpdate(connection,
                            """
                              |INSERT INTO area_city_ad_count (dt,area,city,adid,count)
                              |VALUES (?,?,?,?,?)
                              |ON DUPLICATE KEY
                              |UPDATE count=count+?
                            """.stripMargin, Array(dt, area, city, adid, sum, sum))
                }
                connection.close()
            })
        })
        ssc.start()
        ssc.awaitTermination()
    }
}

3. 最近一小时广告点击量

3.1 思路分析

  1. 开窗确定时间范围;
  2. 在窗口内将数据转换数据结构为((adid, hm), count);
  3. 按照广告id进行分组处理,组内按照时分排序。

3.2 代码实现

scala
package com.rocket.spark.streaming.exercise

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.io.{File, FileWriter, PrintWriter}
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBuffer

object StreamingOneHour {
    def main(args: Array[String]): Unit = {
        //1.初始化Spark配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DStreamByRddQueue")
        val ssc = new StreamingContext(sparkConf, Seconds(5))

        //3.定义Kafka参数
        val kafkaParam: Map[String, Object] = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
                "hadoop102:9092,hadoop103:9092,hadoop104:9092",
            ConsumerConfig.GROUP_ID_CONFIG -> "jack01",
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
            "key.deserializer" ->
                "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer" ->
                "org.apache.kafka.common.serialization.StringDeserializer"
        )

        //4.读取Kafka数据创建 DStream
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
            KafkaUtils.createDirectStream[String, String](ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](Set("sparkAds"), kafkaParam))

        //5.将每条消息的KV取出
        val valueDStream: DStream[Ads_log] = kafkaDStream.map(
            record => {
                val lines: Array[String] = record.value().split(" ")
                Ads_log(lines(0).toLong, lines(1), lines(2), lines(3), lines(4))
            })
        // 使用窗口计算
        val windowRdd: DStream[(Long, Int)] = valueDStream.map(data => {
            val newTs = data.timestamp / 10000 * 10000
            (newTs, 1)
        }).reduceByKeyAndWindow((x: Int, y: Int) => {
            x + y
        }, Seconds(60), Seconds(10))
        windowRdd.foreachRDD(rdd=>{
            val buffer: ListBuffer[String] = ListBuffer[String]()
            val datas: Array[(Long, Int)] = rdd.sortByKey(true).collect()
            datas.foreach{
                case (time, cnt) => {
                    val timeStr: String = new SimpleDateFormat("MM:ss").format(new Date(time))
                    buffer.append(s"""{"xtime": "$timeStr", "yval": "$cnt"}""")
                }
            }
            val out = new PrintWriter(new FileWriter(new File("datas/adClick.json")))
            out.print("["+buffer.mkString(",")+"]")
            out.flush()
            out.close()
        })
        ssc.start()
        ssc.awaitTermination()
    }
}