第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在Reactor中進行多線程文件處理

如何在Reactor中進行多線程文件處理

當年話下 2024-01-25 22:05:10
我正在嘗試使用 Reactor 的 Flux 并行處理多個文件。主要工作負載發(fā)生在對 Flux 的調用中flatMap,然后對 Flux 進行轉換和過濾。每當我嘗試訂閱生成的 Flux 時,主線程都會在收到任何值之前退出。Flux.fromStream(Files.list(Paths.get("directory"))    .flatMap(path -> {         return Flux.create(sink -> {            try (                RandomAccessFile file = new RandomAccessFile(new File(path), "r");                FileChannel fileChannel = file.getChannel()            ) {                // Process file into tokens                sink.next(new Token(".."));            } catch (IOException e) {                sink.error(e);            } finally {                sink.complete();            }        }).subscribeOn(Schedulers.boundedElastic());    })    .map(token -> /* Transform tokens */)    .filter(token -> /* Filter tokens*/)    .subscribe(token -> /* Store tokens in list */)我希望在列表中找到處理管道的輸出,但程序立即退出。首先我想知道我是否正確使用 Flux 類,其次我如何等待訂閱調用完成?
查看完整描述

1 回答

?
青春有我

TA貢獻1784條經驗 獲得超8個贊

我希望在列表中找到處理管道的輸出,但程序立即退出。


您那里的代碼在主線程上設置反應鏈,然后...在主線程上不執(zhí)行任何其他操作。因此,主線程完成了其工作,并且由于boundedElastic()線程是守護線程,因此沒有其他線程阻止程序退出,因此它退出。


您可以通過一個更簡單的示例看到相同的行為:


Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)

            .delayElements(Duration.ofMillis(500));

f.subscribe(System.out::println);

您當然可以調用newBoundedElastic("name", false)它使其成為非守護程序支持的調度程序,但是您必須跟蹤它并在完成后調用 dispose,所以它實際上只是反轉了問題(程序無限運行,直到您處理掉調度程序。)


快速的“n”臟解決方案只是阻止最后一個元素作為Flux程序中的最后一行 - 所以如果我們添加:


f.blockLast();

...然后程序在退出之前等待最后一個元素被發(fā)出,我們就得到了我們想要的行為。


對于簡單的概念證明來說,這很好。然而,它在“生產”代碼中并不理想。首先,“無阻塞”是反應式代碼中的一般規(guī)則,因此,如果您有這樣的阻塞調用,則很難確定它是否是有意的。如果您添加了其他鏈并希望它們完成,則必須為每個鏈添加阻塞調用。這很混亂,而且不可持續(xù)。


更好的解決方案是使用CountDownLatch:


CountDownLatch cdl = new CountDownLatch(1);


Flux.just(1, 2, 3, 4, 5)

        .delayElements(Duration.ofMillis(500))

        .doFinally(s -> cdl.countDown())

        .subscribe(System.out::println);


cdl.await();

這樣做的優(yōu)點是不會顯式阻塞,并且還能夠同時處理多個發(fā)布者(如果將初始值設置為高于 1)。這也往往是我認為通常推薦用于此類事情的方法 -因此,如果您想要最廣泛接受的解決方案,那么可能就是這樣。


然而,我傾向于支持Phaser所有需要等待多個發(fā)布者的示例,而不是只等待一個 - 它的工作方式與 CountdownLatch 類似,但可以動態(tài)地register()運行deregister()。這意味著您可以創(chuàng)建單個移相器,然后根據需要輕松向其注冊多個發(fā)布者,而無需更改初始值,例如:


Phaser phaser = new Phaser(1);


Flux.just(1, 2, 3, 4, 5)

        .doOnSubscribe(s -> phaser.register())

        .delayElements(Duration.ofMillis(500))

        .doFinally(s -> phaser.arriveAndDeregister())

        .subscribe(System.out::println);


Flux.just(1, 2, 3, 4, 5, 6, 7, 8)

        .doOnSubscribe(s -> phaser.register())

        .delayElements(Duration.ofMillis(500))

        .doFinally(s -> phaser.arriveAndDeregister())

        .subscribe(System.out::println);


phaser.arriveAndAwaitAdvance();

(當然,如果需要,您也可以將onSubscribe和doFinally邏輯包裝在單獨的方法中。)


查看完整回答
反對 回復 2024-01-25
  • 1 回答
  • 0 關注
  • 175 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號