Reactor操作符
Reactor 是一个基于 Java 8 的响应式编程库,它提供了丰富的操作符来处理反应式流(Reactor中的Mono
和Flux
)。以下是一些常用的Reactor操作符及其用途:
map
map
- 对Mono
或Flux
中的每个元素应用一个函数,并将结果发出。
Flux.just("1", "2", "3")
.maptoUpperCase
.subscribeprintln; // 输出大写字符
flatMap
flatMap
- 与map
类似,但可以返回一个Mono
或Flux
,并将结果连接起来。
Flux.just("1", "2", "3")
.flatMap(s -> Flux.fromArray(s.split("")))
.subscribeprintln; // 输出单个字符
filter
filter
- 根据给定的谓词函数过滤Flux
中的元素。
Flux.just(1, 2, 3, 4, 5)
.filter(i -> i % 2 == 0)
.subscribeprintln; // 输出偶数
take
take
- 从Flux
中取出前N个元素。
Flux.range(1, 10)
.take(5)
.subscribeprintln; // 只输出前5个数字
skip
skip
- 跳过Flux
中的前N个元素。
Flux.range(1, 10)
.skip(5)
.subscribeprintln; // 跳过前5个数字
reduce
reduce
- 将Flux
中的所有元素组合成一个单一的值。
Flux.range(1, 3)
.reduce((a, b) -> a + b)
.subscribeprintln; // 输出和
concat
concat
- 将两个Flux
序列连接起来,第一个Flux
完成后,第二个Flux
开始发出元素。
Flux.concat(Flux.just("A"), Flux.just("B", "C"))
.subscribeprintln; // 输出ABC
merge
merge
- 将多个Flux
合并成一个,但有背压支持。
Flux.merge(Flux.just("A"), Flux.just("B", "C"))
.subscribeprintln; // 输出ABC,无特定顺序
zip
zip
- 将多个Mono
或Flux
的元素组合在一起,创建一个包含所有元素的集合。
Mono.just("A")
.zipWith(Mono.just("B"), (a, b) -> a + b)
.subscribeprintln; // 输出AB
delay
delay
- 延迟Mono
或Flux
的发出,可以用于控制流的时序。
Flux.just("Hello")
.delayElements(Duration.ofSeconds(1))
.subscribeprintln; // 延迟1秒后输出
Retry
retry
- 当Mono
或Flux
发生错误时,重新尝试操作。
Mono.justOrEmpty("Error")
.retry()
.subscribeprintln; // 发生错误时重试
doOnNext
doOnNext
- 在Flux
的每个元素被订阅者接收之前执行一个操作。
Flux.just("1", "2", "3")
.doOnNext(s -> System.out.println("Emitting: " + s))
.subscribe(); // 输出每个元素的发射信息
switchMap
switchMap
- 将Flux
中的每个元素转换为一个新的Mono
或Flux
,并将这些Mono
或Flux
连接起来,但与flatMap
不同,它会取消之前没有完成的Mono
或Flux
。
Flux.just(1, 2, 3)
.switchMap(i -> Mono.just("String " + i))
.subscribeprintln; // 输出每个转换后的字符串
flatMapSequential
flatMapSequential
- 类似于flatMap
,但会顺序执行每个Mono
或Flux
,而不是并行。
Flux.just(1, 2, 3)
.flatMapSequential(i -> Mono.just("String " + i))
.subscribeprintln; // 顺序输出每个转换后的字符串
publishOn
publishOn
- 指定Mono
或Flux
发出元素时使用的Scheduler
,这有助于控制并发和执行环境。
Mono.just("Hello")
.publishOn(Schedulers.elastic())
.subscribeprintln; // 在指定的Scheduler上发出元素
cache
cache
- 缓存Flux
的元素,使得后续的订阅可以直接获取缓存的元素,而不是重新发出。
Flux.just("1", "2", "3")
.cache()
.subscribeprintln; // 输出元素并缓存
subscribeAgain(); // 再次订阅时,直接获取缓存的元素
distinct
distinct
- 根据给定的键提取函数,只发出不同的元素。
Flux.just(1, 2, 2, 3, 3, 1)
.distinct(i -> i)
.subscribeprintln; // 只输出唯一的元素
takeWhile
takeWhile
- 只要满足给定的谓词函数,就发出Flux
中的元素。
Flux.range(1, 5)
.takeWhile(i -> i < 4)
.subscribeprintln; // 输出小于4的元素
skipWhile
skipWhile
- 跳过Flux
中满足给定的谓词函数的元素,直到遇到不满足的元素为止。
Flux.range(1, 5)
.skipWhile(i -> i < 3)
.subscribeprintln; // 跳过小于3的元素
timeout
timeout
- 如果在指定的时间内没有发出元素,就发出一个错误信号。
Mono.just("Hello")
.delayElement(Duration.ofSeconds(5))
.timeout(Duration.ofSeconds(1))
.subscribeprintln, System.err::println; // 超时后输出错误信息
debounce
debounce
- 过滤掉连续发出的元素,只发出经过指定时间间隔后的第一个元素。
Flux.interval(Duration.ofMillis(100))
.debounce(Duration.ofMillis(200))
.take(5)
.subscribeprintln; // 每200毫秒输出一个元素,共输出5个
groupBy
groupBy
- 根据给定的函数将Flux
的元素分组,每个分组都是一个Flux
。
Flux.just("one", "two", "three", "four")
.groupBylength
.flatMap(group -> group.map(s -> "Group " + group.key() + " : " + s))
.subscribeprintln; // 按长度分组并输出