Flink的 Side Output侧输出和 ProcessFunction

发布时间:2026/6/3 1:14:55

Flink的 Side Output侧输出和 ProcessFunction Side Output侧输出在Flink处理数据流时常常会面临这样的情况需要对一个数据源进行处理该数据源包含不同类型的数据我们需要将其分割处理。使用filter算子对数据源进行筛选分割会导致数据流的多次复制从而造成不必要的性能浪费。为了解决这个问题Flink引入了侧输出Side Output机制该机制可以将数据流进行分割而无需对流进行复制。使用侧输出时用户可以通过定义输出标签Output Tag来标识不同的侧输出流。在处理数据流时通过适当的操作符和条件可以将特定类型的数据发送到相应的侧输出流。侧输出适合Flink中流分割处理、异常数据处理、延迟数据处理场景**例如常见的延迟数据处理场景中可以通过侧输出避免丢弃延迟到达的数据。关于Flink中延迟到达的数据在后续章节介绍。案例Flink读取Socket中通话数据将成功和不成功的数据信息分别输出。Java代码实现ProcessFunctionFlink 的 ProcessFunction 是 DataStream API 中的一个重要组成部分它允许用户为流数据定义自定义的处理逻辑。ProcessFunction 提供了一种强大的机制用于低级别的流转换并完全控制数据事件、状态和时间。 ProcessFunction 是一个抽象类继承自 AbstractRichFunction富函数抽象类并有两个泛型类型参数I输入和 O输出表示输入和输出的数据类型富函数类中拥有的方法ProcessFunction 中都可以使用。 ProcessFunction 中有两个核心方法如下: • processElement() 方法 这个方法用于处理每个元素对于流中的每个元素都会调用一次。它的参数包括输入数据值 value、上下文 ctx 和收集器 out。方法没有返回值处理后的输出数据是通过收集器 out 定义的。 / public void processElement(对象类型 value, ProcessFunction对象类型, 返回对象类型,.Context ctx, Collector返回对象类型 out) throws Exception {... ...}• ◦ value表示当前流中正在处理的输入元素类型与流中数据类型一致。 ◦ ctx表示当前运行的上下文可以获取当前的时间戳并提供了定时服务TimerService用于查询时间和注册定时器还可以将数据发送到侧输出流side output。 ◦ out表示输出数据的收集器使用 out.collect() 方法可以向下游发出一个数据。 • onTimer() 方法 这个方法用于定义定时触发的操作只有在注册的定时器触发时才会调用。定时器是通过 TimerService 注册的相当于设定了一个闹钟到达设定的时间就会触发。 注册定时器方法如下 / ctx.timerService().registerProcessingTimeTimer(定时器触发时间);定时器触发后调用onTimer方法如下 / public void onTimer(long timestamp, ProcessFunction对象类型, 返回对象类型,.OnTimerContext ctx, Collector返回对象类型, out) throws Exception {... ...}onTimer() 方法有三个参数时间戳timestamp、上下文ctx和收集器out。timestamp 是设定好的触发时间通常与水位线watermark相关。方法中可以使用上下文和收集器执行相应的操作包括使用定时服务TimerService和输出处理后的数据。 总而言之Flink 的 ProcessFunction 提供了强大的灵活性可以实现各种自定义的业务逻辑可以实现各种基本转换操作如 flatMap、map 和 filter通过获取上下文的方法还可以自定义状态进行聚合操作。同时ProcessFunction 也支持定时触发操作可以根据时间来分组数据并在指定的时间触发计算和输出结果实现窗口window的功能。 案例Flink读取Socket中通话数据如果被叫手机连续5s呼叫失败生成告警信息。(注意该案例涉及到状态编程这里我们只需要了解状态意思即可后续章节会详细讲解状态)• Java代码实现StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); //必须设置checkpoint否则数据不能正常写出到mysql env.enableCheckpointing(5000); /** * socket 中输入数据如下 * 001,186,187,fail,1000,10 * 002,186,187,success,2000,20 * 003,187,188,fail,3000,30 * 004,187,188,fail,4000,40 * 005,188,187,busy,5000,50 */ SingleOutputStreamOperatorStationLog ds env.socketTextStream(node5, 9999) .map(one - { String[] arr one.split(,); return new StationLog(arr[0], arr[1], arr[2], arr[3], Long.valueOf(arr[4]), Long.valueOf(arr[5])); }); //按照被叫号码分组 KeyedStreamStationLog, String keyedStream ds.keyBy(stationLog - stationLog.getCallIn()); //使用ProcessFunction实现通话时长超过5秒的告警 keyedStream.process(new KeyedProcessFunctionString, StationLog, String() { //使用状态记录上一次通话时间 ValueStateLong timeState null; //在open方法中初始化记录时间的状态 Override public void open(Configuration parameters) throws Exception { ValueStateDescriptorLong time new ValueStateDescriptor(time, Long.class); timeState getRuntimeContext().getState(time); } //每来一条数据调用一次 Override public void processElement(StationLog value, KeyedProcessFunctionString, StationLog, String.Context ctx, CollectorString out) throws Exception { //从状态中获取上次状态存储时间 Long time timeState.value(); //如果时间为null说明是第一条数据注册定时器 if(fail.equals(value.callType) time null){ //获取当前时间 long nowTime ctx.timerService().currentProcessingTime(); //注册定时器5秒后触发 long onTime nowTime 5000; ctx.timerService().registerProcessingTimeTimer(onTime); //更新状态 timeState.update(onTime); } // 表示有呼叫成功了可以取消触发器 if (!value.callType.equals(fail) time ! null) { ctx.timerService().deleteProcessingTimeTimer(time); timeState.clear(); } } //定时器触发时调用,执行触发器发出告警 Override public void onTimer(long timestamp, KeyedProcessFunctionString, StationLog, String.OnTimerContext ctx, CollectorString out) throws Exception { out.collect(触发时间: timestamp 被叫手机号 ctx.getCurrentKey() 连续5秒呼叫失败); //清空时间状态 timeState.clear(); } }).print(); env.execute();Scala代码实现val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment //导入隐式转换 import org.apache.flink.streaming.api.scala._ //必须设置checkpoint否则数据不能写入mysql env.enableCheckpointing(5000) /** * Socket中输入数据如下 * 001,186,187,fail,1000,10 * 002,186,187,success,2000,20 * 003,187,188,fail,3000,30 * 004,187,188,fail,4000,40 */ val ds: DataStream[StationLog] env.socketTextStream(node5, 9999) .map(line { val arr: Array[String] line.split(,) StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) //设置 被叫号码为key ds.keyBy(_.callIn).process(new KeyedProcessFunction[String,StationLog,String] { //定义一个状态记录上次通话时间 lazy val timeState getRuntimeContext.getState(new ValueStateDescriptor[Long](time, classOf[Long])) //每条数据都会调用一次 override def processElement(value: StationLog, ctx: KeyedProcessFunction[String, StationLog, String]#Context, out: Collector[String]): Unit { //获取当前key对应的状态 val time: Long timeState.value() //如果该被叫手机号呼叫状态是fail且time为0说明是第一条数据注册定时器 if(fail.equals(value.callType) time 0){ //获取当前时间 val nowTime: Long ctx.timerService().currentProcessingTime() //触发定时器时间为当前时间5s val onTime nowTime 5000 //注册定时器 ctx.timerService().registerProcessingTimeTimer(onTime) //更新定时器 timeState.update(onTime) } //如果该被叫手机号呼叫状态不是fail且time不为0表示有呼叫成功了可以取消触发器 if(!value.callType.equals(fail) time!0){ //删除定时器 ctx.timerService().deleteProcessingTimeTimer(time) //清空时间状态 timeState.clear() } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext, out: Collector[String]): Unit { //定时器触发时说明该被叫手机号连续5s呼叫失败输出告警信息 out.collect(触发时间: timestamp 被叫手机号 ctx.getCurrentKey 连续5秒呼叫失败) //清空时间状态 timeState.clear() } }).print() env.execute()

相关新闻