反应式编程中的Scheduler与线程模型对比
详细分析Reactor和RxJava中的调度器(Scheduler)和线程模型,包括Reactor的Schedulers工厂方法、RxJava的Scheduler类型、以及subscribeOn/publishOn/observeOn的线程切换机制。
回答
小字辈
1. Reactor调度器类型
// Reactor
Schedulers.immediate(); // 当前线程直接执行
Schedulers.single(); // 单线程复用(WrappedScheduledExecutor)
Schedulers.boundedElastic(); // 弹性线程池(可扩容)
Schedulers.parallel(); // 固定大小(CPU核数)
Schedulers.newBoundedElastic(capacity, queued, name); // 自定义
2. RxJava 3调度器
// RxJava 3
Schedulers.computation(); // CPU密集型(CPU核数线程)
Schedulers.io(); // I/O密集型(动态扩容,缓存线程)
Schedulers.newThread(); // 每个任务新线程
Schedulers.single(); // 单线程
Schedulers.trampoline(); // 队列执行(当前线程)
3. 线程切换操作符
Reactor:
Flux.just("data") // Thread: main
.map(String::toUpperCase) // Thread: main
.subscribeOn(Schedulers.boundedElastic()) // 上游执行线程←
.flatMap(d -> asyncCall(d)) // Thread: elastic-1
.publishOn(Schedulers.parallel()) // 下游执行线程→
.map(String::length) // Thread: parallel-1
.subscribe(); // Thread: parallel-1
- subscribeOn:影响上游操作(从源头到publishOn之前)
- publishOn:影响下游操作(从publishOn之后到subscribe)
- 可以多次使用
RxJava:
Flowable.just("data")
.subscribeOn(Schedulers.io()) // 上游执行线程
.observeOn(AndroidSchedulers.mainThread()) // 下游执行线程(Android)
.subscribe(data -> updateUI(data)); // 在主线程执行
4. 对比总结 | 特性 | Reactor | RxJava 3 | |------|------|------| | 设置上游 | subscribeOn(Scheduler) | subscribeOn(Scheduler) | | 设置下游 | publishOn(Scheduler) | observeOn(Scheduler) | | CPU任务 | Schedulers.parallel() | Schedulers.computation() | | I/O任务 | Schedulers.boundedElastic() | Schedulers.io() | | 自定义 | newBoundedElastic | from(Executor) / from(ExecutorService) |
5. 最佳实践
- CPU密集:
Schedulers.parallel()(线程数=CPU核数) - 阻塞I/O:
Schedulers.boundedElastic()(带容量限制) - 数据库R2DBC:
Schedulers.boundedElastic() - Android UI:
AndroidSchedulers.mainThread()(RxAndroid) - 事务边界:避免跨线程(使用
publishOn会影响事务上下文)