第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

如何在Reactor中進(jìn)行多線程文件處理

如何在Reactor中進(jìn)行多線程文件處理

當(dāng)年話下 2024-01-25 22:05:10
我正在嘗試使用 Reactor 的 Flux 并行處理多個(gè)文件。主要工作負(fù)載發(fā)生在對(duì) Flux 的調(diào)用中flatMap,然后對(duì) Flux 進(jìn)行轉(zhuǎn)換和過濾。每當(dāng)我嘗試訂閱生成的 Flux 時(shí),主線程都會(huì)在收到任何值之前退出。Flux.fromStream(Files.list(Paths.get("directory"))    .flatMap(path -> {         return Flux.create(sink -> {            try (                RandomAccessFile file = new RandomAccessFile(new File(path), "r");                FileChannel fileChannel = file.getChannel()            ) {                // Process file into tokens                sink.next(new Token(".."));            } catch (IOException e) {                sink.error(e);            } finally {                sink.complete();            }        }).subscribeOn(Schedulers.boundedElastic());    })    .map(token -> /* Transform tokens */)    .filter(token -> /* Filter tokens*/)    .subscribe(token -> /* Store tokens in list */)我希望在列表中找到處理管道的輸出,但程序立即退出。首先我想知道我是否正確使用 Flux 類,其次我如何等待訂閱調(diào)用完成?
查看完整描述

1 回答

?
青春有我

TA貢獻(xiàn)1784條經(jīng)驗(yàn) 獲得超8個(gè)贊

我希望在列表中找到處理管道的輸出,但程序立即退出。


您那里的代碼在主線程上設(shè)置反應(yīng)鏈,然后...在主線程上不執(zhí)行任何其他操作。因此,主線程完成了其工作,并且由于boundedElastic()線程是守護(hù)線程,因此沒有其他線程阻止程序退出,因此它退出。


您可以通過一個(gè)更簡(jiǎn)單的示例看到相同的行為:


Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)

            .delayElements(Duration.ofMillis(500));

f.subscribe(System.out::println);

您當(dāng)然可以調(diào)用newBoundedElastic("name", false)它使其成為非守護(hù)程序支持的調(diào)度程序,但是您必須跟蹤它并在完成后調(diào)用 dispose,所以它實(shí)際上只是反轉(zhuǎn)了問題(程序無限運(yùn)行,直到您處理掉調(diào)度程序。)


快速的“n”臟解決方案只是阻止最后一個(gè)元素作為Flux程序中的最后一行 - 所以如果我們添加:


f.blockLast();

...然后程序在退出之前等待最后一個(gè)元素被發(fā)出,我們就得到了我們想要的行為。


對(duì)于簡(jiǎn)單的概念證明來說,這很好。然而,它在“生產(chǎn)”代碼中并不理想。首先,“無阻塞”是反應(yīng)式代碼中的一般規(guī)則,因此,如果您有這樣的阻塞調(diào)用,則很難確定它是否是有意的。如果您添加了其他鏈并希望它們完成,則必須為每個(gè)鏈添加阻塞調(diào)用。這很混亂,而且不可持續(xù)。


更好的解決方案是使用CountDownLatch:


CountDownLatch cdl = new CountDownLatch(1);


Flux.just(1, 2, 3, 4, 5)

        .delayElements(Duration.ofMillis(500))

        .doFinally(s -> cdl.countDown())

        .subscribe(System.out::println);


cdl.await();

這樣做的優(yōu)點(diǎn)是不會(huì)顯式阻塞,并且還能夠同時(shí)處理多個(gè)發(fā)布者(如果將初始值設(shè)置為高于 1)。這也往往是我認(rèn)為通常推薦用于此類事情的方法 -因此,如果您想要最廣泛接受的解決方案,那么可能就是這樣。


然而,我傾向于支持Phaser所有需要等待多個(gè)發(fā)布者的示例,而不是只等待一個(gè) - 它的工作方式與 CountdownLatch 類似,但可以動(dòng)態(tài)地register()運(yùn)行deregister()。這意味著您可以創(chuàng)建單個(gè)移相器,然后根據(jù)需要輕松向其注冊(cè)多個(gè)發(fā)布者,而無需更改初始值,例如:


Phaser phaser = new Phaser(1);


Flux.just(1, 2, 3, 4, 5)

        .doOnSubscribe(s -> phaser.register())

        .delayElements(Duration.ofMillis(500))

        .doFinally(s -> phaser.arriveAndDeregister())

        .subscribe(System.out::println);


Flux.just(1, 2, 3, 4, 5, 6, 7, 8)

        .doOnSubscribe(s -> phaser.register())

        .delayElements(Duration.ofMillis(500))

        .doFinally(s -> phaser.arriveAndDeregister())

        .subscribe(System.out::println);


phaser.arriveAndAwaitAdvance();

(當(dāng)然,如果需要,您也可以將onSubscribe和doFinally邏輯包裝在單獨(dú)的方法中。)


查看完整回答
反對(duì) 回復(fù) 2024-01-25
  • 1 回答
  • 0 關(guān)注
  • 203 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)