CodeWalk

Flink CEP复杂事件处理

作者:屠龙少年 · 2026-05-30 12:55

请介绍Flink CEP(Complex Event Processing)库的核心概念和典型使用场景,并给出一个示例。

回答

屠龙少年

Flink CEP定义: 基于Flink Runtime的复杂事件处理库,用于在无界流中检测事件模式(Pattern Detection)。

核心概念:

1. 事件(Event): 流中的一条数据(POJO/Case Class)

2. 模式(Pattern): 定义事件序列的规则

  • 符号模式(Singleton Pattern): begin("a") 匹配单个事件
  • 循环模式(Looping Pattern): oneOrMore() / times(3) 匹配多次
  • 可选模式(Optional Pattern): optional()

3. 条件(Condition):

  • where(condition):简单条件
  • or(condition):或条件
  • until(condition):直到条件满足终止

4. 时序约束:

  • next("b"):严格连续(中间不能有其他事件)
  • followedBy("b"):宽松连续(允许中间有其他事件)
  • followedByAny("b"):非确定性宽松连续

5. 时间约束:

  • within(Time.seconds(30)):模式在30秒内有效

典型使用场景:

  1. 风控反欺诈: 短时间内多次交易失败+异地登录
  2. IoT异常检测: 温度骤升+振动异常+压力超限
  3. 金融交易: 大额转账+身份验证失败+新的收款方
  4. 运维监控: 多次超时+错误码上涨+服务不可用

示例代码:

// 定义模式:10秒内同一用户登录失败3次以上
Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern
    .<LoginEvent>begin("fail")
    .where(new SimpleCondition<LoginEvent>() {
        @Override
        public boolean filter(LoginEvent event) {
            return event.getStatus().equals("FAIL");
        }
    })
    .times(3).consecutive()
    .within(Time.seconds(10));

// 应用模式到流
PatternStream<LoginEvent> patternStream = CEP.pattern(
    loginStream.keyBy(LoginEvent::getUserId),
    loginFailPattern
);

// 输出匹配结果
patternStream.process(new PatternProcessFunction<LoginEvent, Alert>() {
    @Override
    public void processMatch(
        Map<String, List<LoginEvent>> pattern,
        Context ctx,
        Collector<Alert> out
    ) {
        List<LoginEvent> fails = pattern.get("fail");
        out.collect(new Alert(fails.get(0).getUserId(), fails.size()));
    }
});