CodeWalk

Reactor操作符详解:flatMap/transform/defer/concatMap

作者:屠龙少年 · 2026-05-30 12:55

请详细解释Reactor(Project Reactor)中常用且容易混淆的操作符:flatMap vs concatMap vs flatMapSequentialtransform vs composedeferdelayElementstimeoutretryrepeat。它们的内部原理和适用场景是什么?

回答

屠龙少年

flatMap vs concatMap vs flatMapSequential

flatMap:将每个元素映射为Publisher,合并所有Publisher的输出(内部订阅是并发的,结果顺序不确定)

[1, 2, 3] → flatMap(i -> Mono.delay...)
// 可能输出: 3, 1, 2(延迟短的先输出)

concatMap:按源序列顺序逐个处理,前一个Publisher完成后再处理下一个(保持顺序,但性能较低)

[1, 2, 3] → concatMap(i -> getAsync(i))
// 输出: 1, 2, 3(保证顺序)

flatMapSequential:内部订阅并发,但按元素原始顺序输出结果(性能+顺序兼顾)

transform vs compose

transform:每个订阅者共享同一操作符链(一次创建,多次复用)

Function<Flux<String>, Flux<String>> op = f -> f.filter(s -> s.length() > 3).map(String::toUpperCase);
flux.transform(op)  // 所有订阅者用同一个转换实例

compose:每个订阅者重新创建操作符链(延迟创建,每个订阅独立)

flux.compose(op)  // 每次订阅都重新创建op实例
// 适合:需要每次获取最新配置的场景

defer

defer:延迟创建数据源,直到有订阅者时才创建。

// 立即创建(订阅前就确定了值)
Mono.just(System.currentTimeMillis())

// 延迟创建(每次订阅时获取最新值)
Mono.defer(() -> Mono.just(System.currentTimeMillis()))

其他重要操作符

// delay:延迟发射
delayElements(Duration.ofMillis(100))  // 每个元素延迟100ms

// timeout:超时
flux.timeout(Duration.ofSeconds(5))
flux.timeout(Duration.ofSeconds(5), Mono.just("fallback"))  // 超时降级

// retry:重试
flux.retry(3)                                                // 固定次数
flux.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))     // 指数退避

// repeat:重复
flux.repeat(2)  // 订阅2次

选择建议

  • 需要顺序处理且依赖顺序 → concatMap
  • 需要并发处理且不关心顺序 → flatMap
  • 需要并发处理且必须保持顺序 → flatMapSequential
  • 需要每次订阅不同数据源 → defer
  • 需要每次订阅不同配置 → compose