第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

一旦找到任何匹配項,如何停止并行流?

一旦找到任何匹配項,如何停止并行流?

慕桂英3389331 2021-09-29 13:31:39
我正在嘗試找到與給定謂詞匹配的列表的第一個(任何)成員,如下所示:Item item = items.parallelStream()  .map(i -> i.doSomethingExpensive())  .filter(predicate)  .findAny()  .orElse(null);我希望一旦findAny()獲得匹配,它會立即返回,但情況似乎并非如此。相反,它似乎在返回之前等待 map 方法在大多數(shù)元素上完成。如何立即返回第一個結果并取消其他并行流?有沒有比使用諸如 的流更好的方法來做到這一點CompletableFuture?這是一個顯示行為的簡單示例:private static void log(String msg) {    private static void log(String msg) {    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");    System.out.println(sdf.format(new Date()) + " " + msg);}Random random = new Random();List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);Optional<Integer> num = nums.parallelStream()  .map(n -> {    long delay = Math.abs(random.nextLong()) % 10000;    log("Waiting on " + n + " for " + delay + " ms");    try { Thread.sleep(delay); }    catch (InterruptedException e) { System.err.println("Interruption error"); }    return n * n;  })  .filter(n -> n < 30)  .peek(n -> log("Found match: " + n))  .findAny();log("First match: " + num);日志輸出:14:52:27.061 Waiting on 9 for 2271 ms14:52:27.061 Waiting on 2 for 1124 ms14:52:27.061 Waiting on 13 for 547 ms14:52:27.061 Waiting on 4 for 517 ms14:52:27.061 Waiting on 1 for 1210 ms14:52:27.061 Waiting on 6 for 2646 ms14:52:27.061 Waiting on 0 for 4393 ms14:52:27.061 Waiting on 12 for 5520 ms14:52:27.581 Found match: 1614:52:27.582 Waiting on 3 for 5365 ms14:52:28.188 Found match: 414:52:28.275 Found match: 114:52:31.457 Found match: 014:52:32.950 Found match: 914:52:32.951 First match: Optional[0]一旦找到匹配項(在本例中為 16),findAny()不會立即返回,而是阻塞直到其余線程完成。在這種情況下,調(diào)用者在找到匹配后返回之前要額外等待 5 秒。
查看完整描述

3 回答

?
慕娘9325324

TA貢獻1783條經(jīng)驗 獲得超4個贊

相反,它似乎在返回之前等待 map 方法在大多數(shù)元素上完成。


這是不正確的。


當談到已經(jīng)被處理的元素時,它將等待所有元素的完成,因為 Stream API 允許并發(fā)處理本質(zhì)上不是線程安全的數(shù)據(jù)結構。在從終端操作返回之前,它必須確保所有潛在的并發(fā)訪問都已完成。


在談論整個流時,在 8 核機器上測試只有 14 個元素的流是不公平的。當然,至少會有 8 個并發(fā)操作開始,這就是全部。您正在通過使用findFirst()而不是為火焰添加燃料findAny(),因為這并不意味著按處理順序返回第一個找到的元素,而是按遇到順序返回第一個元素,即在您的示例中恰好為零,因此線程處理除第一個塊之外的其他塊不能假設他們的結果是正確答案,并且比使用 更愿意幫助處理其他候選人findAny()。


當你使用


List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());

Optional<Integer> num = nums.parallelStream()

        .map(n -> {

            long delay = ThreadLocalRandom.current().nextInt(10_000);

            log("Waiting on " + n + " for " + delay + " ms");

            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));

            return n * n;

        })

        .filter(n -> n < 40_000)

        .peek(n -> log("Found match: " + n))

        .findAny();


log("First match: " + num);

盡管流元素的數(shù)量要多得多,但您將獲得相似數(shù)量的任務運行完成。


請注意,CompletableFuture它也不支持中斷,因此我想到的唯一用于返回任何結果和取消其他作業(yè)的內(nèi)置功能是舊的ExecutorService.invokeAny.


要為其構建映射和過濾功能,我們可以使用以下輔助函數(shù):


static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {

    return () -> {

        R r = f.apply(t);

        if(!p.test(r)) throw new NoSuchElementException();

        return r;

    };

}

不幸的是,只有使用值或異常完成的選項,因此我們必須對不匹配的元素使用異常。


然后我們可以像這樣使用它


ExecutorService es = ForkJoinPool.commonPool();

Integer result = es.invokeAny(IntStream.range(0, 100)

    .mapToObj(i -> mapAndfilter(i,

        n -> {

            long delay = ThreadLocalRandom.current().nextInt(10_000);

            log("Waiting on " + n + " for " + delay + " ms");

            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));

            return n * n;

        },

        n -> n < 10_000))

    .collect(Collectors.toList()));


log("result: "+result);

它不僅會取消掛起的任務,還會在不等待它們完成的情況下返回。


當然,這意味著操作作業(yè)的源數(shù)據(jù)必須是不可變的或線程安全的。


查看完整回答
反對 回復 2021-09-29
?
九州編程

TA貢獻1785條經(jīng)驗 獲得超4個贊

您可以使用此代碼來說明 parallelStream 的工作原理:


final List<String> list = Arrays.asList("first", "second", "third", "4th", "5th", "7th", "8th", "9th", "10th", "11th", "12th", "13th");


    String result = list.parallelStream()

                        .map(s -> {

                            System.out.println("map: " + s);

                            return s;

                        })

                        .filter(s -> {

                            System.out.println("fiter: " + s);

                            return s.equals("8th");

                        })

                        .findFirst()

                        .orElse(null);


    System.out.println("result=" + result);

有兩種選擇可以實現(xiàn)您的目標,用過濾器停止昂貴的操作:


根本不要使用流,使用簡單的 for 或增強的 for

先過濾,然后用昂貴的操作映射


查看完整回答
反對 回復 2021-09-29
  • 3 回答
  • 0 關注
  • 193 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網(wǎng)微信公眾號