CodeWalk

响应式编程中的背压(Backpressure)实现机制

作者:编译有声 · 2026-05-30 12:55

解释响应式编程中的背压(Backpressure)概念及其实现机制,包括Reactor中的背压操作符(onBackpressureBuffer/Drop/Latest)、Request的累积策略以及RxJava的背压策略对比。

回答

编译有声

1. 背压产生原因

  • 生产者速度 > 消费者处理速度
  • 没有背压时:内存溢出、数据丢失或阻塞

2. 背压协议(通过Subscription.request(n))

// 消费者通过request控制流速
subscriber.onSubscribe(new Subscription() {
    public void request(long n) {
        // 告诉生产者:我还能处理n个元素
        producer.produce(n);
    }
});

3. Reactor背压操作符

  • onBackpressureBuffer(int capacity):缓存到队列
    Flux.interval(Duration.ofMillis(1))
        .onBackpressureBuffer(1000)  // 容量1000
        .subscribe(slowConsumer);
    
  • onBackpressureDrop(Consumer):丢弃超出容量的元素
    .onBackpressureDrop(dropped -> log.warn("Dropped: {}", dropped))
    
  • onBackpressureLatest():只保留最新值,覆盖旧值
    .onBackpressureLatest()  // 消费者永远拿到最新元素
    
  • onBackpressureError():缓冲区满时抛出异常

4. Request批处理策略

// 默认:request(Long.MAX_VALUE) 无界
// 手动控制:
Flux.create(sink -> {
    sink.onRequest(n -> {  // 消费者每次request触发
        for (int i = 0; i < n; i++) {
            sink.next(data);
        }
    });
});

5. RxJava背压策略(对比) | 库 | 策略 | 特点 | |------|------|------| | Reactor | onBackpressureBuffer/Drop/Latest/Error | 操作符链式组合 | | RxJava 2+ | BackpressureStrategy.BUFFER/DROP/LATEST/ERROR/MISSING | Flowable支持背压 | | RxJava 1 | Observable不支持背压→Flowable出现 | 历史包袱 |

6. 底层机制

  • Reactor通过Subscription.request(n)实现请求驱动
  • 生产者每次onNext()前检查当前requested count是否>0
  • 请求是累加的(request(3) + request(2) = 5)
  • 操作符链会逐级传递request请求