1 回答

TA貢獻1784條經驗 獲得超8個贊
我希望在列表中找到處理管道的輸出,但程序立即退出。
您那里的代碼在主線程上設置反應鏈,然后...在主線程上不執(zhí)行任何其他操作。因此,主線程完成了其工作,并且由于boundedElastic()線程是守護線程,因此沒有其他線程阻止程序退出,因此它退出。
您可以通過一個更簡單的示例看到相同的行為:
Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(500));
f.subscribe(System.out::println);
您當然可以調用newBoundedElastic("name", false)它使其成為非守護程序支持的調度程序,但是您必須跟蹤它并在完成后調用 dispose,所以它實際上只是反轉了問題(程序無限運行,直到您處理掉調度程序。)
快速的“n”臟解決方案只是阻止最后一個元素作為Flux程序中的最后一行 - 所以如果我們添加:
f.blockLast();
...然后程序在退出之前等待最后一個元素被發(fā)出,我們就得到了我們想要的行為。
對于簡單的概念證明來說,這很好。然而,它在“生產”代碼中并不理想。首先,“無阻塞”是反應式代碼中的一般規(guī)則,因此,如果您有這樣的阻塞調用,則很難確定它是否是有意的。如果您添加了其他鏈并希望它們完成,則必須為每個鏈添加阻塞調用。這很混亂,而且不可持續(xù)。
更好的解決方案是使用CountDownLatch:
CountDownLatch cdl = new CountDownLatch(1);
Flux.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(500))
.doFinally(s -> cdl.countDown())
.subscribe(System.out::println);
cdl.await();
這樣做的優(yōu)點是不會顯式阻塞,并且還能夠同時處理多個發(fā)布者(如果將初始值設置為高于 1)。這也往往是我認為通常推薦用于此類事情的方法 -因此,如果您想要最廣泛接受的解決方案,那么可能就是這樣。
然而,我傾向于支持Phaser所有需要等待多個發(fā)布者的示例,而不是只等待一個 - 它的工作方式與 CountdownLatch 類似,但可以動態(tài)地register()運行deregister()。這意味著您可以創(chuàng)建單個移相器,然后根據需要輕松向其注冊多個發(fā)布者,而無需更改初始值,例如:
Phaser phaser = new Phaser(1);
Flux.just(1, 2, 3, 4, 5)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);
Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);
phaser.arriveAndAwaitAdvance();
(當然,如果需要,您也可以將onSubscribe和doFinally邏輯包裝在單獨的方法中。)
添加回答
舉報