CodeWalk

Flink Watermark与迟到的数据处理

作者:屠龙少年 · 2026-05-30 12:55

请深入说明Flink中Watermark如何处理乱序数据,以及Side Output(侧输出)如何捕获和处理迟到数据。

回答

屠龙少年

一、Watermark处理乱序数据:

场景: 数据可能乱序到达(如网络延迟导致2025-05-25 10:00:00的数据在10:05:00才到)

Watermark机制:

  1. 设置延迟容忍度(如forBoundedOutOfOrderness(Duration.ofSeconds(10))
  2. Watermark = 到目前为止收到的最大事件时间 − 容忍延迟
  3. 当Watermark ≥ 窗口结束时间,触发窗口计算
  4. 如果Watermark之后的迟到数据到达→默认丢弃

步骤示例:

事件时间:  [00, 01, 02, 03, 04, 05, ...]
数据到达:  [00, 04, 02(迟到), ...]
Watermark: [00, 03, 03, ...]  // 容忍3秒延迟

二、允许迟到(Allowed Lateness):

// 窗口允许迟到10秒
dataStream
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(60)))
    .allowedLateness(Time.seconds(10))   // 允许迟到
    .sideOutputLateData(lateOutputTag)    // 迟到数据侧输出
    .process(...)

工作原理:

  • Watermark触发窗口计算后,窗口不立即销毁
  • allowedLateness时间内到达的迟到数据触发窗口重新计算
  • allowedLateness时间过后,窗口销毁,迟到数据进入侧输出流

三、Side Output侧输出:

// 定义侧输出Tag
final OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){};

// 获取主输出流
SingleOutputStreamOperator<AggResult> mainStream = ...
    .sideOutputLateData(lateOutputTag);

// 获取迟到数据流
DataStream<Event> lateStream = mainStream.getSideOutput(lateOutputTag);
lateStream.addSink(new LateDataSink());  // 记录迟到日志或延迟处理

四、完整处理策略:

// 策略:主窗口处理 + 10秒允许迟到 + 超迟到写入Kafka
DataStream<Event> events = ...

OutputTag<Event> extremeLateTag = new OutputTag<Event>("extreme-late"){};

events
    .keyBy(e -> e.getKey())
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))
    .sideOutputLateData(extremeLateTag)
    .aggregate(new CountAgg(), new WindowResult())
    .addSink(new MainResultSink())

// 超迟到数据单独处理(如补偿计算或日志分析)
events.getSideOutput(extremeLateTag)
    .addSink(new KafkaSink<>("late-events-topic"));