CodeWalk

反应式编程中的Scheduler与线程模型对比

作者:小字辈 · 2026-05-30 12:55

详细分析Reactor和RxJava中的调度器(Scheduler)和线程模型,包括Reactor的Schedulers工厂方法、RxJava的Scheduler类型、以及subscribeOn/publishOn/observeOn的线程切换机制。

回答

小字辈

1. Reactor调度器类型

// Reactor
Schedulers.immediate();        // 当前线程直接执行
Schedulers.single();           // 单线程复用(WrappedScheduledExecutor)
Schedulers.boundedElastic();   // 弹性线程池(可扩容)
Schedulers.parallel();         // 固定大小(CPU核数)
Schedulers.newBoundedElastic(capacity, queued, name); // 自定义

2. RxJava 3调度器

// RxJava 3
Schedulers.computation();      // CPU密集型(CPU核数线程)
Schedulers.io();               // I/O密集型(动态扩容,缓存线程)
Schedulers.newThread();        // 每个任务新线程
Schedulers.single();           // 单线程
Schedulers.trampoline();       // 队列执行(当前线程)

3. 线程切换操作符

Reactor

Flux.just("data")                              // Thread: main
    .map(String::toUpperCase)                    // Thread: main
    .subscribeOn(Schedulers.boundedElastic())    // 上游执行线程←
    .flatMap(d -> asyncCall(d))                  // Thread: elastic-1
    .publishOn(Schedulers.parallel())            // 下游执行线程→
    .map(String::length)                         // Thread: parallel-1
    .subscribe();                                // Thread: parallel-1
  • subscribeOn:影响上游操作(从源头到publishOn之前)
  • publishOn:影响下游操作(从publishOn之后到subscribe)
  • 可以多次使用

RxJava

Flowable.just("data")
    .subscribeOn(Schedulers.io())       // 上游执行线程
    .observeOn(AndroidSchedulers.mainThread())  // 下游执行线程(Android)
    .subscribe(data -> updateUI(data));  // 在主线程执行

4. 对比总结 | 特性 | Reactor | RxJava 3 | |------|------|------| | 设置上游 | subscribeOn(Scheduler) | subscribeOn(Scheduler) | | 设置下游 | publishOn(Scheduler) | observeOn(Scheduler) | | CPU任务 | Schedulers.parallel() | Schedulers.computation() | | I/O任务 | Schedulers.boundedElastic() | Schedulers.io() | | 自定义 | newBoundedElastic | from(Executor) / from(ExecutorService) |

5. 最佳实践

  • CPU密集:Schedulers.parallel()(线程数=CPU核数)
  • 阻塞I/O:Schedulers.boundedElastic()(带容量限制)
  • 数据库R2DBC:Schedulers.boundedElastic()
  • Android UI:AndroidSchedulers.mainThread()(RxAndroid)
  • 事务边界:避免跨线程(使用publishOn会影响事务上下文)