Reactor线程切换解析

在 Reactor(Project Reactor)中,线程切换通常通过 publishOnsubscribeOn 这两个操作符来完成。它们的作用如下:

1. publishOn —— 控制下游线程

publishOn 适用于需要在不同的线程上执行流处理的情况。例如,数据流的某部分计算量大,可以切换到 Schedulers.parallel() 以提高效率。

Flux.range(1, 5)
    .map(i -> {
        System.out.println("First map on: " + Thread.currentThread().getName());
        return i;
    })
    .publishOn(Schedulers.parallel()) // 切换到并行线程池
    .map(i -> {
        System.out.println("Second map on: " + Thread.currentThread().getName());
        return i;
    })
    .subscribe();

运行示例输出:

First map on: main
First map on: main
First map on: main
First map on: main
First map on: main
Second map on: parallel-1
Second map on: parallel-2
Second map on: parallel-3
Second map on: parallel-4
Second map on: parallel-5

解释:


2. subscribeOn —— 控制上游线程

subscribeOn 影响的是 数据的订阅及生成线程,即上游(数据源)的执行线程。通常用于 I/O 操作,比如读取数据库或调用 HTTP API。

Flux.range(1, 5)
    .subscribeOn(Schedulers.boundedElastic()) // 影响上游
    .map(i -> {
        System.out.println("Mapping on: " + Thread.currentThread().getName());
        return i;
    })
    .subscribe();

运行示例输出:

Mapping on: boundedElastic-1
Mapping on: boundedElastic-1
Mapping on: boundedElastic-1
Mapping on: boundedElastic-1
Mapping on: boundedElastic-1

解释:


3. publishOn + subscribeOn 组合使用

如果同时使用 subscribeOnpublishOnsubscribeOn 影响的是数据的生成线程,而 publishOn 影响的是下游的执行线程

Flux.range(1, 5)
    .subscribeOn(Schedulers.boundedElastic()) // 影响上游
    .map(i -> {
        System.out.println("First map on: " + Thread.currentThread().getName());
        return i;
    })
    .publishOn(Schedulers.parallel()) // 影响下游
    .map(i -> {
        System.out.println("Second map on: " + Thread.currentThread().getName());
        return i;
    })
    .subscribe();

运行示例输出:

First map on: boundedElastic-1
First map on: boundedElastic-1
First map on: boundedElastic-1
First map on: boundedElastic-1
First map on: boundedElastic-1
Second map on: parallel-1
Second map on: parallel-2
Second map on: parallel-3
Second map on: parallel-4
Second map on: parallel-5

解释:


4. Schedulers 选择:

Reactor 提供了一些默认的 Scheduler


总结:

操作符 作用 影响范围
subscribeOn(Scheduler) 切换 上游 线程 数据源(订阅时的执行线程)
publishOn(Scheduler) 切换 下游 线程 影响它之后的操作

如果你需要:

  1. 控制数据生成(I/O 操作)线程 → 用 subscribeOn(Schedulers.boundedElastic())
  2. 控制数据处理(计算任务)线程 → 用 publishOn(Schedulers.parallel())

你在实际开发中会经常遇到 I/O 线程切换(比如 WebFlux 请求、数据库查询)和计算线程切换(比如数据处理、流式计算),掌握 subscribeOnpublishOn 的区别对优化性能很有帮助。