Skip to content

DStream转换

DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

1. 无状态转化操作

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意,针对键值对的DStream转化操作(比如reduceByKey())要添加import StreamingContext._才能在Scala 中使用。 Alt text 需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。
例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

1.1 Transform

Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。

scala
package com.rocket.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DStreamByTransform {
    def main(args: Array[String]): Unit = {
        //1.初始化 Spark 配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DStreamByTransform")
        //2.初始化 SparkStreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        val wordCountStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 7777)

        val countDStream: DStream[(String, Int)] = wordCountStream.transform(
            rdd => {
                // 在Driver端,周期性的执行
                val wordRdd: RDD[String] = rdd.flatMap(item=>{
                    // 在Executor端执行
                    item.split(" ")
                })
                val countRdd: RDD[(String, Int)] = wordRdd.map((_, 1)).reduceByKey(_ + _)
                countRdd
            })

        // transform主要用在代码需要在Driver周期性执行场景,其他场景直接用RDD更加简洁
        // wordCountStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
        countDStream.print

        // 启动任务
        ssc.start()
        ssc.awaitTermination()
    }
}

运行结果:
Alt text

提示

transform和foreachRdd都是外面是Driver执行,里面中间是Driver周期执行, 里面rdd是Executor执行。

1.2 join

两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行join,与两个RDD的join效果相同。

scala
package com.rocket.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DStreamByJoin {
    def main(args: Array[String]): Unit = {
        //1.初始化 Spark 配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DStreamByTransform")
        //2.初始化 SparkStreamingContext, 采集周期设置大一些,避免join不到
        val ssc = new StreamingContext(sparkConf, Seconds(6))
        val wordCountStream1 = ssc.socketTextStream("hadoop102", 7777)//.repartition(1)
        val wordCountStream2 = ssc.socketTextStream("hadoop103", 8888)//.repartition(1)

        val ds1: DStream[(String, String)] = wordCountStream1.flatMap(_.split(" ")).map((_, "ds1:1"))
        val ds2: DStream[(String, String)] = wordCountStream2.flatMap(_.split(" ")).map((_, "ds2:a"))

        val ds3: DStream[(String, (String, String))] = ds1.join(ds2)
        ds3.print()

        // 启动任务
        ssc.start()
        ssc.awaitTermination()
    }
}

运行结果:
Alt text

2. 有状态转化操作

2.1 UpdateStateByKey

UpdateStateByKey原语用于记录历史记录,有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的DStream,其内部数据为(键,状态)对。
updateStateByKey()的结果会是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的。 updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:

  1. 定义状态,状态可以是一个任意的数据类型。
  2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
    使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。
scala
package com.rocket.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DStreamByUpdateStateByKey {
    def main(args: Array[String]): Unit = {
        //1.初始化 Spark 配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DStreamByRddQueue")
        //2.初始化 SparkStreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(3))
        // 设置检查点,因为有状态过后需要保存累计数据
        ssc.checkpoint("ck")
        val lines = ssc.socketTextStream("hadoop102", 7777)

        val wordRdd: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))

        /**
         * updateStateByKey根据key对数据的状态进行更新
         * 传递的参数有两个, 
         * 第一个参数为相同的key的value值,
         * 第二个参数表示缓存区相同key的value数据
         */
        val stateDstream = wordRdd.updateStateByKey(
            (seq: Seq[Int], buff:Option[Int])=>{
                val newCount = buff.getOrElse(0) + seq.length
                Option(newCount)
            }
        )
        stateDstream.print()
        // 启动任务
        ssc.start()
        ssc.awaitTermination()
    }
}

运行结果:
Alt text

2.2 WindowOperations

Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

  • 窗口时长:计算内容的时间范围;
  • 滑动步长:隔多久触发一次计算。

提示

窗口时长和滑动步长都必须为采集周期大小的整数倍。

关于Window的操作还有如下方法:

  1. window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的Dstream;
  2. countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
  3. reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
  4. reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。
  5. reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并"反向 reduce"离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的"加""减"计数。通过前边介绍可以想到,这个函数只适用于"可逆的reduce函数",也就是这些reduce函数有相应的"反 reduce"函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。
scala
package com.rocket.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DStreamByWindow {
    def main(args: Array[String]): Unit = {
        //1.初始化 Spark 配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DStreamByRddQueue")
        //2.初始化 SparkStreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(3))
        // 设置检查点,因为有状态过后需要保存累计数据
        ssc.checkpoint("ck")
        val lines = ssc.socketTextStream("hadoop102", 7777)

        val wordRdd: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))
        // 窗口的采集范围应该式采集周期的整数倍, 设置步长避免数据重复计算
        val windowDS: DStream[(String, Int)] = wordRdd.window(Seconds(6), Seconds(6))

        val wordCountRdd: DStream[(String, Int)] = windowDS.reduceByKey(_ + _)
        wordCountRdd.print()
        // 启动任务
        ssc.start()
        ssc.awaitTermination()
    }
}

运行结果:
Alt text