CodeWalk

Reactor编程模型与响应式流规范

作者:古法程序员 · 2026-05-30 12:55

解释Reactor编程模型的核心概念,包括响应式流规范(Reactive Streams)的Publisher/Subscriber/Subscription/Processor四个接口,以及Reactor库中Mono和Flux的差异。

回答

古法程序员

1. 响应式流规范(Reactive Streams) 四个核心接口:

// 数据提供者
public interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}

// 数据消费者
public interface Subscriber<T> {
    void onSubscribe(Subscription s);  // 订阅成功后调用
    void onNext(T item);               // 处理下一个元素
    void onError(Throwable t);         // 处理错误
    void onComplete();                 // 流结束
}

// 请求控制(背压)
public interface Subscription {
    void request(long n);   // 请求N个元素
    void cancel();          // 取消订阅
}

// 既是Publisher又是Subscriber
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

2. Reactor核心类型

  • Mono:0或1个元素的异步序列
    Mono<String> result = Mono.just("Hello")
        .map(String::toUpperCase)
        .delayElement(Duration.ofSeconds(1));
    
  • Flux:0到N个元素的异步序列
    Flux<Integer> range = Flux.range(1, 10)
        .filter(i -> i % 2 == 0)
        .flatMap(i -> Mono.just(i * 10));
    

3. 操作符分类

  • 创建:just/from/range/interval/defer/create/push
  • 转换:map/flatMap/transform/switchMap/concatMap
  • 过滤:filter/take/skip/distinct
  • 组合:zip/merge/concat/combineLatest
  • 错误处理:onErrorReturn/onErrorResume/retry/doOnError
  • 背压:onBackpressureBuffer/onBackpressureDrop/onBackpressureLatest

4. Scheduler调度器

Mono.just("data")
    .subscribeOn(Schedulers.boundedElastic())  // 执行在哪个线程
    .publishOn(Schedulers.parallel())          // 后续操作在哪个线程
  • Schedulers.immediate():当前线程
  • Schedulers.single():单线程
  • Schedulers.boundedElastic():弹性线程池
  • Schedulers.parallel():并行线程池(CPU核数)