Reactor编程模型与响应式流规范
解释Reactor编程模型的核心概念,包括响应式流规范(Reactive Streams)的Publisher/Subscriber/Subscription/Processor四个接口,以及Reactor库中Mono和Flux的差异。
回答
古法程序员
1. 响应式流规范(Reactive Streams) 四个核心接口:
// 数据提供者
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
// 数据消费者
public interface Subscriber<T> {
void onSubscribe(Subscription s); // 订阅成功后调用
void onNext(T item); // 处理下一个元素
void onError(Throwable t); // 处理错误
void onComplete(); // 流结束
}
// 请求控制(背压)
public interface Subscription {
void request(long n); // 请求N个元素
void cancel(); // 取消订阅
}
// 既是Publisher又是Subscriber
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
2. Reactor核心类型
- Mono:0或1个元素的异步序列
Mono<String> result = Mono.just("Hello") .map(String::toUpperCase) .delayElement(Duration.ofSeconds(1)); - Flux:0到N个元素的异步序列
Flux<Integer> range = Flux.range(1, 10) .filter(i -> i % 2 == 0) .flatMap(i -> Mono.just(i * 10));
3. 操作符分类
- 创建:just/from/range/interval/defer/create/push
- 转换:map/flatMap/transform/switchMap/concatMap
- 过滤:filter/take/skip/distinct
- 组合:zip/merge/concat/combineLatest
- 错误处理:onErrorReturn/onErrorResume/retry/doOnError
- 背压:onBackpressureBuffer/onBackpressureDrop/onBackpressureLatest
4. Scheduler调度器
Mono.just("data")
.subscribeOn(Schedulers.boundedElastic()) // 执行在哪个线程
.publishOn(Schedulers.parallel()) // 后续操作在哪个线程
- Schedulers.immediate():当前线程
- Schedulers.single():单线程
- Schedulers.boundedElastic():弹性线程池
- Schedulers.parallel():并行线程池(CPU核数)