Flink Watermark与迟到的数据处理
请深入说明Flink中Watermark如何处理乱序数据,以及Side Output(侧输出)如何捕获和处理迟到数据。
回答
屠龙少年
一、Watermark处理乱序数据:
场景: 数据可能乱序到达(如网络延迟导致2025-05-25 10:00:00的数据在10:05:00才到)
Watermark机制:
- 设置延迟容忍度(如
forBoundedOutOfOrderness(Duration.ofSeconds(10))) - Watermark = 到目前为止收到的最大事件时间 − 容忍延迟
- 当Watermark ≥ 窗口结束时间,触发窗口计算
- 如果Watermark之后的迟到数据到达→默认丢弃
步骤示例:
事件时间: [00, 01, 02, 03, 04, 05, ...]
数据到达: [00, 04, 02(迟到), ...]
Watermark: [00, 03, 03, ...] // 容忍3秒延迟
二、允许迟到(Allowed Lateness):
// 窗口允许迟到10秒
dataStream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.allowedLateness(Time.seconds(10)) // 允许迟到
.sideOutputLateData(lateOutputTag) // 迟到数据侧输出
.process(...)
工作原理:
- Watermark触发窗口计算后,窗口不立即销毁
- 在
allowedLateness时间内到达的迟到数据触发窗口重新计算 allowedLateness时间过后,窗口销毁,迟到数据进入侧输出流
三、Side Output侧输出:
// 定义侧输出Tag
final OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){};
// 获取主输出流
SingleOutputStreamOperator<AggResult> mainStream = ...
.sideOutputLateData(lateOutputTag);
// 获取迟到数据流
DataStream<Event> lateStream = mainStream.getSideOutput(lateOutputTag);
lateStream.addSink(new LateDataSink()); // 记录迟到日志或延迟处理
四、完整处理策略:
// 策略:主窗口处理 + 10秒允许迟到 + 超迟到写入Kafka
DataStream<Event> events = ...
OutputTag<Event> extremeLateTag = new OutputTag<Event>("extreme-late"){};
events
.keyBy(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(extremeLateTag)
.aggregate(new CountAgg(), new WindowResult())
.addSink(new MainResultSink())
// 超迟到数据单独处理(如补偿计算或日志分析)
events.getSideOutput(extremeLateTag)
.addSink(new KafkaSink<>("late-events-topic"));