Reactor操作符详解:flatMap/transform/defer/concatMap
请详细解释Reactor(Project Reactor)中常用且容易混淆的操作符:flatMap vs concatMap vs flatMapSequential、transform vs compose、defer、delayElements、timeout、retry、repeat。它们的内部原理和适用场景是什么?
回答
屠龙少年
flatMap vs concatMap vs flatMapSequential
flatMap:将每个元素映射为Publisher,合并所有Publisher的输出(内部订阅是并发的,结果顺序不确定)
[1, 2, 3] → flatMap(i -> Mono.delay...)
// 可能输出: 3, 1, 2(延迟短的先输出)
concatMap:按源序列顺序逐个处理,前一个Publisher完成后再处理下一个(保持顺序,但性能较低)
[1, 2, 3] → concatMap(i -> getAsync(i))
// 输出: 1, 2, 3(保证顺序)
flatMapSequential:内部订阅并发,但按元素原始顺序输出结果(性能+顺序兼顾)
transform vs compose
transform:每个订阅者共享同一操作符链(一次创建,多次复用)
Function<Flux<String>, Flux<String>> op = f -> f.filter(s -> s.length() > 3).map(String::toUpperCase);
flux.transform(op) // 所有订阅者用同一个转换实例
compose:每个订阅者重新创建操作符链(延迟创建,每个订阅独立)
flux.compose(op) // 每次订阅都重新创建op实例
// 适合:需要每次获取最新配置的场景
defer
defer:延迟创建数据源,直到有订阅者时才创建。
// 立即创建(订阅前就确定了值)
Mono.just(System.currentTimeMillis())
// 延迟创建(每次订阅时获取最新值)
Mono.defer(() -> Mono.just(System.currentTimeMillis()))
其他重要操作符
// delay:延迟发射
delayElements(Duration.ofMillis(100)) // 每个元素延迟100ms
// timeout:超时
flux.timeout(Duration.ofSeconds(5))
flux.timeout(Duration.ofSeconds(5), Mono.just("fallback")) // 超时降级
// retry:重试
flux.retry(3) // 固定次数
flux.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) // 指数退避
// repeat:重复
flux.repeat(2) // 订阅2次
选择建议
- 需要顺序处理且依赖顺序 → concatMap
- 需要并发处理且不关心顺序 → flatMap
- 需要并发处理且必须保持顺序 → flatMapSequential
- 需要每次订阅不同数据源 → defer
- 需要每次订阅不同配置 → compose