1 回答

TA貢獻(xiàn)1796條經(jīng)驗(yàn) 獲得超4個贊
這個人為的例子可能會更清楚:
Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
.flatMap(x -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName())));
這將給出類似的東西:
Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4
或者,換句話說,您的反應(yīng)式存儲庫沒有在同一個調(diào)度程序上發(fā)布,這就是您看到您所做的行為的原因?!癠p until the next occurrence of publishOn()”并不意味著下次您的代碼調(diào)用publishOn()- 它也可以在您的任何調(diào)用中的任何發(fā)布者中flatMap(),您將無法控制。
添加回答
舉報