响应式编程中的背压(Backpressure)实现机制
解释响应式编程中的背压(Backpressure)概念及其实现机制,包括Reactor中的背压操作符(onBackpressureBuffer/Drop/Latest)、Request的累积策略以及RxJava的背压策略对比。
回答
编译有声
1. 背压产生原因
- 生产者速度 > 消费者处理速度
- 没有背压时:内存溢出、数据丢失或阻塞
2. 背压协议(通过Subscription.request(n))
// 消费者通过request控制流速
subscriber.onSubscribe(new Subscription() {
public void request(long n) {
// 告诉生产者:我还能处理n个元素
producer.produce(n);
}
});
3. Reactor背压操作符
- onBackpressureBuffer(int capacity):缓存到队列
Flux.interval(Duration.ofMillis(1)) .onBackpressureBuffer(1000) // 容量1000 .subscribe(slowConsumer); - onBackpressureDrop(Consumer):丢弃超出容量的元素
.onBackpressureDrop(dropped -> log.warn("Dropped: {}", dropped)) - onBackpressureLatest():只保留最新值,覆盖旧值
.onBackpressureLatest() // 消费者永远拿到最新元素 - onBackpressureError():缓冲区满时抛出异常
4. Request批处理策略
// 默认:request(Long.MAX_VALUE) 无界
// 手动控制:
Flux.create(sink -> {
sink.onRequest(n -> { // 消费者每次request触发
for (int i = 0; i < n; i++) {
sink.next(data);
}
});
});
5. RxJava背压策略(对比) | 库 | 策略 | 特点 | |------|------|------| | Reactor | onBackpressureBuffer/Drop/Latest/Error | 操作符链式组合 | | RxJava 2+ | BackpressureStrategy.BUFFER/DROP/LATEST/ERROR/MISSING | Flowable支持背压 | | RxJava 1 | Observable不支持背压→Flowable出现 | 历史包袱 |
6. 底层机制
- Reactor通过
Subscription.request(n)实现请求驱动 - 生产者每次
onNext()前检查当前requested count是否>0 - 请求是累加的(
request(3)+request(2)= 5) - 操作符链会逐级传递request请求