第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

一旦找到任何匹配項(xiàng),如何停止并行流?

一旦找到任何匹配項(xiàng),如何停止并行流?

慕桂英3389331 2021-09-29 13:31:39
我正在嘗試找到與給定謂詞匹配的列表的第一個(gè)(任何)成員,如下所示:Item item = items.parallelStream()  .map(i -> i.doSomethingExpensive())  .filter(predicate)  .findAny()  .orElse(null);我希望一旦findAny()獲得匹配,它會(huì)立即返回,但情況似乎并非如此。相反,它似乎在返回之前等待 map 方法在大多數(shù)元素上完成。如何立即返回第一個(gè)結(jié)果并取消其他并行流?有沒(méi)有比使用諸如 的流更好的方法來(lái)做到這一點(diǎn)CompletableFuture?這是一個(gè)顯示行為的簡(jiǎn)單示例:private static void log(String msg) {    private static void log(String msg) {    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");    System.out.println(sdf.format(new Date()) + " " + msg);}Random random = new Random();List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);Optional<Integer> num = nums.parallelStream()  .map(n -> {    long delay = Math.abs(random.nextLong()) % 10000;    log("Waiting on " + n + " for " + delay + " ms");    try { Thread.sleep(delay); }    catch (InterruptedException e) { System.err.println("Interruption error"); }    return n * n;  })  .filter(n -> n < 30)  .peek(n -> log("Found match: " + n))  .findAny();log("First match: " + num);日志輸出:14:52:27.061 Waiting on 9 for 2271 ms14:52:27.061 Waiting on 2 for 1124 ms14:52:27.061 Waiting on 13 for 547 ms14:52:27.061 Waiting on 4 for 517 ms14:52:27.061 Waiting on 1 for 1210 ms14:52:27.061 Waiting on 6 for 2646 ms14:52:27.061 Waiting on 0 for 4393 ms14:52:27.061 Waiting on 12 for 5520 ms14:52:27.581 Found match: 1614:52:27.582 Waiting on 3 for 5365 ms14:52:28.188 Found match: 414:52:28.275 Found match: 114:52:31.457 Found match: 014:52:32.950 Found match: 914:52:32.951 First match: Optional[0]一旦找到匹配項(xiàng)(在本例中為 16),findAny()不會(huì)立即返回,而是阻塞直到其余線(xiàn)程完成。在這種情況下,調(diào)用者在找到匹配后返回之前要額外等待 5 秒。
查看完整描述

3 回答

?
慕娘9325324

TA貢獻(xiàn)1783條經(jīng)驗(yàn) 獲得超4個(gè)贊

相反,它似乎在返回之前等待 map 方法在大多數(shù)元素上完成。


這是不正確的。


當(dāng)談到已經(jīng)被處理的元素時(shí),它將等待所有元素的完成,因?yàn)?Stream API 允許并發(fā)處理本質(zhì)上不是線(xiàn)程安全的數(shù)據(jù)結(jié)構(gòu)。在從終端操作返回之前,它必須確保所有潛在的并發(fā)訪(fǎng)問(wèn)都已完成。


在談?wù)撜麄€(gè)流時(shí),在 8 核機(jī)器上測(cè)試只有 14 個(gè)元素的流是不公平的。當(dāng)然,至少會(huì)有 8 個(gè)并發(fā)操作開(kāi)始,這就是全部。您正在通過(guò)使用findFirst()而不是為火焰添加燃料findAny(),因?yàn)檫@并不意味著按處理順序返回第一個(gè)找到的元素,而是按遇到順序返回第一個(gè)元素,即在您的示例中恰好為零,因此線(xiàn)程處理除第一個(gè)塊之外的其他塊不能假設(shè)他們的結(jié)果是正確答案,并且比使用 更愿意幫助處理其他候選人findAny()。


當(dāng)你使用


List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());

Optional<Integer> num = nums.parallelStream()

        .map(n -> {

            long delay = ThreadLocalRandom.current().nextInt(10_000);

            log("Waiting on " + n + " for " + delay + " ms");

            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));

            return n * n;

        })

        .filter(n -> n < 40_000)

        .peek(n -> log("Found match: " + n))

        .findAny();


log("First match: " + num);

盡管流元素的數(shù)量要多得多,但您將獲得相似數(shù)量的任務(wù)運(yùn)行完成。


請(qǐng)注意,CompletableFuture它也不支持中斷,因此我想到的唯一用于返回任何結(jié)果和取消其他作業(yè)的內(nèi)置功能是舊的ExecutorService.invokeAny.


要為其構(gòu)建映射和過(guò)濾功能,我們可以使用以下輔助函數(shù):


static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {

    return () -> {

        R r = f.apply(t);

        if(!p.test(r)) throw new NoSuchElementException();

        return r;

    };

}

不幸的是,只有使用值或異常完成的選項(xiàng),因此我們必須對(duì)不匹配的元素使用異常。


然后我們可以像這樣使用它


ExecutorService es = ForkJoinPool.commonPool();

Integer result = es.invokeAny(IntStream.range(0, 100)

    .mapToObj(i -> mapAndfilter(i,

        n -> {

            long delay = ThreadLocalRandom.current().nextInt(10_000);

            log("Waiting on " + n + " for " + delay + " ms");

            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));

            return n * n;

        },

        n -> n < 10_000))

    .collect(Collectors.toList()));


log("result: "+result);

它不僅會(huì)取消掛起的任務(wù),還會(huì)在不等待它們完成的情況下返回。


當(dāng)然,這意味著操作作業(yè)的源數(shù)據(jù)必須是不可變的或線(xiàn)程安全的。


查看完整回答
反對(duì) 回復(fù) 2021-09-29
?
九州編程

TA貢獻(xiàn)1785條經(jīng)驗(yàn) 獲得超4個(gè)贊

您可以使用此代碼來(lái)說(shuō)明 parallelStream 的工作原理:


final List<String> list = Arrays.asList("first", "second", "third", "4th", "5th", "7th", "8th", "9th", "10th", "11th", "12th", "13th");


    String result = list.parallelStream()

                        .map(s -> {

                            System.out.println("map: " + s);

                            return s;

                        })

                        .filter(s -> {

                            System.out.println("fiter: " + s);

                            return s.equals("8th");

                        })

                        .findFirst()

                        .orElse(null);


    System.out.println("result=" + result);

有兩種選擇可以實(shí)現(xiàn)您的目標(biāo),用過(guò)濾器停止昂貴的操作:


根本不要使用流,使用簡(jiǎn)單的 for 或增強(qiáng)的 for

先過(guò)濾,然后用昂貴的操作映射


查看完整回答
反對(duì) 回復(fù) 2021-09-29
  • 3 回答
  • 0 關(guān)注
  • 206 瀏覽
慕課專(zhuān)欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢(xún)優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)