線程與調(diào)度:Java反應(yīng)式編程
首先,我要感谢所有读者、访问者和订阅者持续的支持和参与。在这篇博客文章中,我们将探讨Java响应式编程中的线程和调度器的概念。理解这些概念对于编写高效且响应迅速的响应式应用来说非常重要。我们将通过不同场景来讲解,并给出每个概念的实例。
简介
在反应式编程中,线程和调度器在控制任务执行方面非常关键。在反应式编程中,我们通常会处理异步数据流,而管理这些数据流在不同线程上的执行对于实现并发性和并行性至关重要。
发布者/订阅者 — 默认线程示例
通常情况下,反应式编程中的发布者和订阅者通常在同一线程上运行。我们来看一个简单的例子。
例子:
例如:
import reactor.core.publisher.Flux;
public class DefaultThreadDemo {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5);
flux.subscribe(
data -> System.out.println("收到: " + data + " 线程: " + Thread.currentThread().getName()),
err -> System.err.println("异常: " + err),
() -> System.out.println("完成在: " + Thread.currentThread().getName())
);
}
}
收到: 1,主线程: main
收到: 2,主线程: main
收到: 3,主线程: main
收到: 4,主线程: main
收到: 5,主线程: main
完成,主线程: main
在这里,你可以注意到所有操作都是在主线程上进行的。
zh:粉丝
根据我们的需求,订阅方们是那些消费发布者发出的数据的那些人。我们可以拥有不同类型订阅方,以满足我们的需求。
点击这里订阅
subscribeOn
指定了订阅发生所在的调度器。它会影响到上游的操作。我们来看一个例子:
例子:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class SubscribeOnDemo {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5)
.map(i -> {
System.out.println("映射1: " + i + " 线程: " + Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("映射2: " + i + " 线程: " + Thread.currentThread().getName());
return i;
});
flux.subscribe(
data -> System.out.println("收到: " + data + " 线程: " + Thread.currentThread().getName())
);
// 休眠以便查看输出结果
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
输出:
Map1: 1 在線程:boundedElastic-1
Map2: 1 在線程:boundedElastic-1
Map1: 2 在線程:boundedElastic-1
Map2: 2 在線程:boundedElastic-1
Map1: 3 在線程:boundedElastic-1
Map2: 3 在線程:boundedElastic-1
Map1: 4 在線程:boundedElastic-1
Map2: 4 在線程:boundedElastic-1
Map1: 5 在線程:boundedElastic-1
Map2: 5 在線程:boundedElastic-1
接收:1 在線程:boundedElastic-1
接收:2 在線程:boundedElastic-1
接收:3 在線程:boundedElastic-1
接收:4 在線程:boundedElastic-1
接收:5 在線程:boundedElastic-1
在这里,所有的操作都在 boundedElastic
线程上执行,因为我们用了 subscribeOn
。
试用订阅:
咱们再来看看 subscribeOn
操作符的例子。
例子:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
/**
* 多订阅示例
*/
public class MultipleSubscribeOnDemo {
/**
* 主函数入口,用于执行示例代码
*/
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5)
.subscribeOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("映射1: " + i + " 在线程: " + Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.parallel())
.map(i -> {
System.out.println("映射2: " + i + " 在线程: " + Thread.currentThread().getName());
return i;
});
flux.subscribe(
data -> System.out.println("接收到: " + data + " 在线程: " + Thread.currentThread().getName())
);
// 为了能看到输出结果,暂停一下
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
结果:
Map1: 1 在 parallel-1 线程中
Map2: 1 在 parallel-1 线程中
Map1: 2 在 parallel-1 线程中
Map2: 2 在 parallel-1 线程中
Map1: 3 在 parallel-1 线程中
Map2: 3 在 parallel-1 线程中
Map1: 4 在 parallel-1 线程中
Map2: 4 在 parallel-1 线程中
Map1: 5 在 parallel-1 线程中
Map2: 5 在 parallel-1 线程中
收到:1, 2, 3, 4, 5 在 parallel-1 线程中
在这种情况下,第二个 subscribeOn
调用取代了第一个,所有操作都在 parallel
线程上执行。
多处订阅
如果我们使用多个subscribeOn
操作符,只有第一个会被执行。后续的subscribeOn
操作将被忽略。
情景: 你想展示 subscribeOn
如何影响响应式管道不同部分的执行情况。
例如:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class MultipleSubscribeOnDemo {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5)
.subscribeOn(Schedulers.parallel())
.map(i -> {
System.out.println("打印 \"Map1: " + i + " 当前线程名为: " + Thread.currentThread().getName() + "\"");
return i;
})
.subscribeOn(Schedulers.single())
.map(i -> {
System.out.println("打印 \"Map2: " + i + " 当前线程名为: " + Thread.currentThread().getName() + "\"");
return i;
});
flux.subscribe(
data -> System.out.println("接收数据: " + data + " 当前线程名为: " + Thread.currentThread().getName())
);
// 订阅多个调度器实例以观察数据处理的线程
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
输出:
Map1: 1 在 thread 中: parallel-1
Map1: 2 在 thread 中: parallel-2
Map1: 3 在 thread 中: parallel-3
Map1: 4 在 thread 中: parallel-4
Map1: 5 在 thread 中: parallel-5
Map2: 1 在 thread 中: single-1
Map2: 2 在 thread 中: single-1
Map2: 3 在 thread 中: single-1
Map2: 4 在 thread 中: single-1
Map2: 5 在 thread 中: single-1
接收: 1 在线程中: single-1
接收: 2 在线程中: single-1
接收: 3 在线程中: single-1
接收: 4 在线程中: single-1
接收: 5 在线程中: single-1
说明:
来看看这个例子:
- 首先调用
subscribeOn(Schedulers.parallel())
,这会使得map1
的执行在parallel
调度器上进行。 - 随后调用
subscribeOn(Schedulers.single())
,使得map2
的执行环境切换到single
调度器。
然而,在 Reactor 和大多数反应式框架中,只有第一个 subscribeOn
影响实际的订阅上下文环境。后续的 subscribeOn
调用不会改变上下文。在上述示例中,尽管调用了第二个 subscribeOn(Schedulers.single())
,map1
和 map2
操作的调度仍然使用 parallel
调度器。
这种做法确保了一致性,同时避免了在反应式编程中的混淆。因为订阅上下文应在反应式链的开始理想地只设置一次。
调度器即时
Schedulers.immediate()
在当前线程上,执行任务。它在测试中很有用,或需要立即运行任务时。
例子:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class ImmediateSchedulerDemo {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5)
.map(i -> {
System.out.println("映射1:" + i + " 在线程:" + Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.immediate())
.map(i -> {
System.out.println("映射2:" + i + " 在线程:" + Thread.currentThread().getName());
return i;
});
flux.subscribe(
data -> System.out.println("接收到:" + data + " 在线程:" + Thread.currentThread().getName())
);
}
}
这是一个演示 ImmediateScheduler 的示例。
结果显示为:
Map1: 1 主线程
Map2: 1 主线程
Map1: 2 主线程
Map2: 2 主线程
Map1: 3 主线程
Map2: 3 主线程
Map1: 4 主线程
Map2: 4 主线程
Map1: 5 主线程
Map2: 5 主线程
接收: 1 主线程
接收: 2 主线程
接收: 3 主线程
接收: 4 主线程
接收: 5 主线程
所有操作都在主线程上。
计划程式 — 虚拟线程
Schedulers.virtual()
使用 Java 虚拟线程来实现轻量级并发。Java 虚拟线程是 Project Loom 的一个功能。
例子:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class VirtualThreadDemo {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5)
.map(i -> {
System.out.println("Map1: " + i + " 在 线程: " + Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.virtual())
.map(i -> {
System.out.println("Map2: " + i + " 在 线程: " + Thread.currentThread().getName());
return i;
});
flux.subscribe(
data -> System.out.println("接收到 " + data + ",此时线程名称为: " + Thread.currentThread().getName())
);
// 休眠1秒以查看输出结果
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
输出:
Map1: 1, 在 线程 中: 虚拟线程-1
Map2: 1, 在 线程 中: 虚拟线程-1
Map1: 2, 在 线程 中: 虚拟线程-1
Map2: 2, 在 线程 中: 虚拟线程-1
Map1: 3, 在 线程 中: 虚拟线程-1
Map2: 3, 在 线程 中: 虚拟线程-1
Map1: 4, 在 线程 中: 虚拟线程-1
Map2: 4, 在 线程 中: 虚拟线程-1
Map1: 5, 在 线程 中: 虚拟线程-1
Map2: 5, 在 线程 中: 虚拟线程-1
收到:1, 在 线程 中: 虚拟线程-1
收到:2, 在 线程 中: 虚拟线程-1
收到:3, 在 线程 中: 虚拟线程-1
收到:4, 在 线程 中: 虚拟线程-1
收到:5, 在 线程 中: 虚拟线程-1
更多关于调度器的内容
调度器用来控制执行上下文。除了Schedulers.immediate()
和Schedulers.virtual()
之外,还有其它几个调度器,比如:
Schedulers.parallel()
: 用于并行处理。Schedulers.single()
: 用于单线程处理。Schedulers.boundedElastic()
: 用于阻塞任务和I/O限制性任务。
发布时间
publishOn
指定了下游操作应在哪个调度器上执行。它会影响后续管道中的操作。
例子:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class PublishOnDemo {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5)
.map(i -> {
System.out.println("Map1: " + i + " on thread: " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("Map2: " + i + " on thread: " + Thread.currentThread().getName());
return i;
});
flux.subscribe(
data -> System.out.println("Received: " + data + " on thread: " + Thread.currentThread().getName())
);
// Sleep to see the output
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
结果如下:
映射1: 1, 线程为: main
映射2: 1, 线程为: boundedElastic-1
映射1: 2, 线程为: main
映射2: 2, 线程为: boundedElastic-1
映射1: 3, 线程为: main
映射2: 3, 线程为: boundedElastic-1
映射1: 4, 线程为: main
映射2: 4, 线程为: boundedElastic-1
映射1: 5, 线程为: main
映射2: 5, 线程为: boundedElastic-1
接收到: 1, 线程为: boundedElastic-1
接收到: 2, 线程为: boundedElastic-1
接收到: 3, 线程为: boundedElastic-1
接收到: 4, 线程为: boundedElastic-1
接收到: 5, 线程为: boundedElastic-1
发布时间为:演示版
我们再来看一个关于 publishOn
操作符的例子
例如:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class MultiplePublishOnDemo {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5)
.publishOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("Map1: " + i + " on thread: " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.parallel())
.map(i -> {
System.out.println("Map2: " + i + " on thread: " + Thread.currentThread().getName());
return i;
});
flux.subscribe(
data -> System.out.println("Received: " + data + " on thread: " + Thread.currentThread().getName())
);
// 让程序暂停以便看到输出
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
没有输出
Map1: 1 线程: boundedElastic-1
Map2: 1 线程: parallel-1
Map1: 2 线程: boundedElastic-1
Map2: 2 线程: parallel-1
Map1: 3 线程: boundedElastic-1
Map2: 3 线程: parallel-1
Map1: 4 线程: boundedElastic-1
Map2: 4 线程: parallel-1
Map1: 5 线程: boundedElastic-1
Map2: 5 线程: parallel-1
接收到: 1 线程: parallel-1
接收到: 2 线程: parallel-1
接收到: 3 线程: parallel-1
接收到: 4 线程: parallel-1
接收到: 5 线程: parallel-1
在这种情况下,第二个 publishOn
调用改变了执行线程,而下游的操作则在名为 parallel
的线程上执行。
阻止事件循环 — 问题解决
在响应式编程中,阻塞事件循环线程是一个常见的问题。使用合适的调度器可以避免这个问题。
例子:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class BlockingEventLoopFix {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5)
.map(i -> {
System.out.println("正在处理: " + i + " 线程: " + Thread.currentThread().getName());
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
return i;
})
.publishOn(Schedulers.boundedElastic());
flux.subscribe(
data -> System.out.println("收到: " + data + " 线程: " + Thread.currentThread().getName())
);
// 休眠以便查看输出
try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
如下:
处理:1 线程是:boundedElastic-1
处理:2 线程是:boundedElastic-1
处理:3 线程是:boundedElastic-1
处理:4 线程是:boundedElastic-1
处理:5 线程是:boundedElastic-1
已接收:1 线程是:boundedElastic-1
已接收:2 线程是:boundedElastic-1
已接收:3 线程是:boundedElastic-1
已接收:4 线程是:boundedElastic-1
已接收:5 线程是:boundedElastic-1
在这里,处理是在 boundedElastic
调度器上完成的,这样就避免了主线程被卡住。
发布和订阅在
结合 publishOn
和 subscribeOn
的使用让我们对线程进行更精细的控制。
例子:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
// 本示例展示了如何使用Reactor库中的publishOn和subscribeOn方法来调整线程调度.
public class PublishOnSubscribeOnDemo {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5)
.subscribeOn(Schedulers.boundedElastic()) // 订阅在有界弹性调度器上
.map(i -> {
System.out.println("映射1: " + i + " 在线程上: " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.parallel()) // 发布在并行调度器上
.map(i -> {
System.out.println("映射2: " + i + " 在线程上: " + Thread.currentThread().getName());
return i;
});
flux.subscribe(
data -> System.out.println("接收: " + data + " 在线程上: " + Thread.currentThread().getName())
);
// 睡眠以查看输出
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
输出:
Map1:1在 线程中:boundedElastic-1
Map2:1在 线程中:parallel-1
Map1:2在 线程中:boundedElastic-1
Map2:2在 线程中:parallel-1
Map1:3在 线程中:boundedElastic-1
Map2:3在 线程中:parallel-1
Map1:4在 线程中:boundedElastic-1
Map2:4在 线程中:parallel-1
Map1:5在 线程中:boundedElastic-1
Map2:5在 线程中:parallel-1
接收到:1在 线程中:parallel-1
接收到:2在 线程中:parallel-1
接收到:3在 线程中:parallel-1
接收到:4在 线程中:parallel-1
接收到:5在 线程中:parallel-1
并行处理
并行执行对于可以并行处理的任务很有用。我们来看一个例子:
例子:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class ParallelExecutionDemo {
public static void main(String[] args) {
Flux.range(1, 10)
.parallel()
.runOn(Schedulers.parallel())
.map(i -> {
System.out.println("正在处理: " + i + " 线程: " + Thread.currentThread().getName());
return i;
})
.sequential()
.subscribe(
data -> System.out.println("收到: " + data + " 线程: " + Thread.currentThread().getName())
);
// 为了让输出可见而暂停
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
输出:
正在处理:1 线程 parallel-1
正在处理:2 线程 parallel-2
正在处理:3 线程 parallel-3
正在处理:4 线程 parallel-4
正在处理:5 线程 parallel-5
正在处理:6 线程 parallel-6
正在处理:7 线程 parallel-7
正在处理:8 线程 parallel-8
正在处理:9 线程 parallel-9
正在处理:10 线程 parallel-10
已接收:1 线程 parallel-1
已接收:2 线程 parallel-2
已接收:3 线程 parallel-3
已接收:4 线程 parallel-4
已接收:5 线程 parallel-5
已接收:6 线程 parallel-6
已接收:7 线程 parallel-7
已接收:8 线程 parallel-8
已接收:9 线程 parallel-9
已接收:10 线程 parallel-10
在这个例子中,任务通过并行线程来执行,显著提升了这些并发任务的执行效率。
总结
在这篇帖子中,我们探讨了 Java 响应式编程中的各种线程和调度的概念。我们涵盖了:
- 默认的线程处理方式
- 不同的订阅者类型
subscribeOn
及其效果publishOn
及其效果- 使用
ImmediateScheduler
,VirtualScheduler
, 以及其他调度器 - 解决阻塞事件循环问题
- 结合
publishOn
和subscribeOn
- 实现并行处理
了解这些概念能帮你写出更高效、更快速响应的响应式应用程序。
作业:
尝试,实现一个响应式流,该流并行处理整数的列表,并,使用合适的调度器过滤出偶数。
解决
这里提供了一个可能的解决方案来完成任务:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class AssignmentSolution {
public static void main(String[] args) {
Flux.range(1, 20)
.parallel()
.runOn(Schedulers.parallel())
.filter(i -> i % 2 != 0)
.sequential()
.subscribe(
data -> System.out.println("接收:" + data + " 在线程:" + Thread.currentThread().getName())
);
// 为了观察输出结果暂停一下
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
输出:
收到: 1, 线程名: parallel-1
收到: 3, 线程名: parallel-1
收到: 5, 线程名: parallel-2
收到: 7, 线程名: parallel-3
收到: 9, 线程名: parallel-4
收到: 11, 线程名: parallel-5
收到: 13, 线程名: parallel-6
收到: 15, 线程名: parallel-7
收到: 17, 线程名: parallel-8
收到: 19, 线程名: parallel-9
在此解决方案中,整数被并行处理,筛选出奇数,最后输出。
谢谢阅读!如果您有任何疑问或意见,请在下方留言。希望您编程愉快!
共同學(xué)習(xí),寫(xiě)下你的評(píng)論
評(píng)論加載中...
作者其他優(yōu)質(zhì)文章