分流
所谓"分流",就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
1. 简单实现
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。
其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用filter()
方法进行筛选,就可以得到拆分之后的流了。
java
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<String> stream = env.socketTextStream("hadoop102", 7777);
// 使用filter实现分流
SingleOutputStreamOperator<String> evenFilter =
stream.filter((FilterFunction<String>) value -> Integer.valueOf(value) % 2 == 0);
SingleOutputStreamOperator<String> oddFilter = stream.filter(value -> Integer.valueOf(value) % 2 == 1);
evenFilter.print("偶数流");
oddFilter.print("奇数流");
env.execute();
}
这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?
2. 使用侧输出流
简单来说,只需要调用上下文ctx的output()
方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个"输出标签"(OutputTag),指定了侧输出流的id和类型。
java
private static class WaterSensorProcessFunction extends ProcessFunction<WaterSensor, WaterSensor> {
@Override
public void processElement(WaterSensor value, ProcessFunction<WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {
if(value.getId().startsWith("s1")){
// s1数据放入s1流中
// OutputTag对象,第一个参数是标签名,第二个参数是放入测流的的数据类型
// ctx为上下文对象,调用它实现将数据放入侧输出流, 第一个参数为utputTag对象, 第二个参数为数据
ctx.output(new OutputTag<WaterSensor>("s1", Types.POJO(WaterSensor.class)), value);
}else if(value.getId().startsWith("s2")){
// s2数据放入s2流中
ctx.output(new OutputTag<WaterSensor>("s2", Types.POJO(WaterSensor.class)), value);
}else{
// 非s1\s2数据放入主流中
out.collect(value);
}
}
}
java
public class FlinkSideOutDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
SingleOutputStreamOperator<WaterSensor> stream =
env.socketTextStream("hadoop102", 7777)
.map(it -> {
String[] split = it.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
});
// 实现分流, 使用底层api方法process()实现, 比如map,flatMap,filter等算子都是基于process
SingleOutputStreamOperator<WaterSensor> process = stream.process(new WaterSensorProcessFunction());
// 从主流中根据标签获取侧输出流
DataStream<WaterSensor> s1 = process.getSideOutput(new OutputTag<WaterSensor>("s1", Types.POJO(WaterSensor.class)));
DataStream<WaterSensor> s2 = process.getSideOutput(new OutputTag<WaterSensor>("s2", Types.POJO(WaterSensor.class)));
// 打印数据
process.print("主流");
// 区分控制台主流,使用红色打印
s1.printToErr("s1流");
s2.printToErr("s2流");
env.execute();
}
}
在necat终端输入数据,查看程序打印输出:
总结测流分流步骤
- 使用process算子
- 定义OutputTag对象
- 调用ctx.output
- 通过主流 获取侧流