Spring WebFlux响应式编程与背压机制
请详细解释Spring WebFlux的响应式编程模型和**背压(Backpressure)**机制。WebFlux基于Reactor库(Flux/Mono)如何工作?背压是什么?为什么在响应式编程中重要?如何控制背压策略(BUFFER/DROP/LATEST/ERROR)?WebFlux的线程模型与Spring MVC有什么不同?
回答
屠龙少年
Reactor核心类型
- Mono:0或1个元素的异步序列
- Flux:0到N个元素的异步序列
// 创建
Mono<String> mono = Mono.just("hello");
Flux<Integer> flux = Flux.range(1, 100);
// 操作符
flux.map(i -> i * 2)
.filter(i -> i > 10)
.flatMap(i -> Mono.just(i * 3))
.subscribe(System.out::println);
背压(Backpressure)
背压定义:消费者处理速度跟不上生产者时,消费者反向通知生产者降低发送速率的一种反馈机制。
为什么重要:
- 生产者(上游)可能远快于消费者(下游)
- 没有背压会导致:内存溢出、响应延迟增加、系统雪崩
背压策略:
Flux.range(1, 1_000_000)
.onBackpressureBuffer(1000) // 缓冲(默认),满则报错
.onBackpressureDrop(x -> log.warn("丢弃: {}", x)) // 丢弃新元素
.onBackpressureLatest() // 只保留最新的
.onBackpressureError() // 超出就报错
.subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1); // 每次只请求1个元素
}
});
WebFlux vs MVC线程模型
| 特性 | Spring MVC(Tomcat) | Spring WebFlux(Netty) |
|---|---|---|
| 线程模型 | 请求-线程(1请求1线程) | 事件驱动(少量线程处理大量请求) |
| 线程数 | 200(Tomcat默认) | 少量(CPU核心数) |
| 阻塞操作 | 允许(线程挂起) | ❌ 必须非阻塞,否则阻塞事件循环 |
| IO方式 | Servlet API(阻塞) | Reactive Streams(非阻塞) |
| 数据库 | JDBC(阻塞) | R2DBC(响应式JDBC) |
| 适用 | 传统应用、IO非密集型 | 高并发、IO密集型、网关 |
WebFlux Controller示例
@RestController
@RequestMapping("/users")
public class UserController {
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable String id) {
return userService.findById(id);
}
@GetMapping
public Flux<User> listUsers() {
return userService.findAll();
}
}
核心注意点
- 所有操作必须非阻塞:不能调用Thread.sleep()、JDBC等阻塞API
- 使用专门的响应式驱动:MongoDB Reactive、R2DBC、Reactive Redis
- 错误处理:用onErrorReturn/onErrorResume/onErrorMap替代try-catch
- 调度器:阻塞操作需放到Schedulers.boundedElastic()执行