第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

當(dāng)其中之一拋出異常時(shí)如何阻止可運(yùn)行對(duì)象的執(zhí)行

當(dāng)其中之一拋出異常時(shí)如何阻止可運(yùn)行對(duì)象的執(zhí)行

千萬里不及你 2023-07-19 17:05:51
我有一組元素,對(duì)于每個(gè)元素,我都執(zhí)行方法,將其作為 Runnable 傳遞給 CompletableFuture.runAsync() 。在執(zhí)行過程中,可能需要停止整個(gè)計(jì)算,因此我在執(zhí)行方法之前檢查一些條件。如果計(jì)算應(yīng)該停止,那么我會(huì)拋出一個(gè)異常,該異常在 CompletableFuture 之外處理。我想阻止所有 Runnables 的執(zhí)行,這些 Runnables 在拋出異常后執(zhí)行。因此,換句話說,當(dāng)其中任何一個(gè) CompletableFuture 拋出異常時(shí),我不想等待所有 CompletableFuture 完成。Set elements = ...Executor executor = Executors.newFixedThreadPool(N);try {    CompletableFuture.allOf(elements.stream().map(e - > CompletableFuture.runAsync(() - > {        if (shouldStop()) {            throw new MyException();        }        myMethod(e);    }, executor)).toArray(CompletableFuture[]::new)).join()} catch (CompletionException e) {    ...}
查看完整描述

3 回答

?
慕田峪9158850

TA貢獻(xiàn)1794條經(jīng)驗(yàn) 獲得超7個(gè)贊

發(fā)生異常時(shí)全部取消即可。障礙在于您在創(chuàng)建它們時(shí)并不了解所有這些,并且您不想多次執(zhí)行此工作。這可以通過創(chuàng)建一個(gè)新的、空的CompletableFuture第一個(gè)(我們稱之為f1)來解決。然后,像以前一樣創(chuàng)建 future,但f1.cancel在if(shouldStop()) { … }語句中插入對(duì) 的調(diào)用。然后,在創(chuàng)建所有 future 后,將一個(gè)操作鏈接起來,將所有 future 取消f1。


取消將達(dá)到兩個(gè)目的,它將阻止尚未開始的可運(yùn)行對(duì)象的執(zhí)行,并且將使未來通過不allOf等待仍在進(jìn)行的評(píng)估完成來返回。


由于取消 aCompletableFuture與使用 a 異常完成它沒有什么不同CancellationException,并且在出現(xiàn)多個(gè)異常的情況下,由 返回的 futureallOf將報(bào)告任意一個(gè),我們可以使用自completeExceptionally定義來MyException代替,以確保報(bào)告的異常不會(huì)是次要的CancellationException。


一個(gè)獨(dú)立的例子是:


static final AtomicInteger STOP = new AtomicInteger(2);

static boolean shouldStop() {

    return STOP.getAndDecrement() <= 0;

}

static final int N = 10;

public static void main(String[] args) {

    Set<Integer> elements = IntStream.range(0, 100).boxed().collect(Collectors.toSet());

    ExecutorService executor = Executors.newFixedThreadPool(N);

    try {

        CompletableFuture<?> cancelAll = new CompletableFuture<>();

        CompletableFuture<?>[] all = elements.stream()

            .map(e ->

                CompletableFuture.runAsync(() -> {

                    System.out.println("entered "+e);

                    if(shouldStop()) {

                        RuntimeException myException = new RuntimeException("stopped");

                         // alternatively cancelAll.cancel(false);

                        cancelAll.completeExceptionally(myException);

                        throw myException;

                    }

                    System.out.println("processing "+e);

                }, executor))

            .toArray(CompletableFuture<?>[]::new);

        cancelAll.whenComplete((value,throwable) -> {

            if(throwable != null) {

                for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);

            }

        });

        CompletableFuture.allOf(all).join();

    } catch (CompletionException e) {

        e.printStackTrace();

    }

    executor.shutdown();

}

這會(huì)打印類似的東西


entered 3

entered 8

entered 4

entered 6

entered 1

entered 9

entered 0

entered 7

entered 5

entered 2

entered 10

processing 8

processing 3

java.util.concurrent.CompletionException: java.lang.RuntimeException: stopped

    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)

    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)

    at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1423)

    at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1144)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at CompletableFutureTest.lambda$main$3(CompletableFutureTest.java:34)

    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)

    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:26)

    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)

    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

    at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.lang.RuntimeException: stopped

    at CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:25)

    ... 4 more

顯示由于并發(fā)性,一些可運(yùn)行對(duì)象已經(jīng)在運(yùn)行,但一旦傳播取消,就不會(huì)啟動(dòng)后續(xù)執(zhí)行。


請(qǐng)注意,由于cancelAll只會(huì)在異常情況下完成或根本不會(huì)完成,cancelAll.whenComplete((value,throwable) -> { for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable); });因此您可以將鏈接操作簡(jiǎn)化為,但這只是編碼風(fēng)格是否保留冗余檢查的問題。


您還可以向處理步驟添加延遲,以確保allOf(all).join()在滿足停止條件時(shí)不會(huì)等待完成。


還可以將一個(gè)操作鏈接到返回的 future,runAsync該操作將在任何異常完成時(shí)取消所有操作,而不僅僅是顯式停止。但是,必須注意返回表示通過 安排的操作的原始未來,runAsync而不是返回的未來whenComplete。


CompletableFuture<?> cancelAll = new CompletableFuture<>();

CompletableFuture<?>[] all = elements.stream()

    .map(e -> {

        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {

            System.out.println("entered "+e);

            if(shouldStop()) throw new RuntimeException("stopped");

            System.out.println("processing "+e);

        }, executor);

        cf.whenComplete((value,throwable) -> {

            if(throwable != null) cancelAll.completeExceptionally(throwable);

        });

        return cf;

    })

    .toArray(CompletableFuture<?>[]::new);

cancelAll.whenComplete((value,throwable) -> {

    for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);

});

CompletableFuture.allOf(all).join();


查看完整回答
反對(duì) 回復(fù) 2023-07-19
?
青春有我

TA貢獻(xiàn)1784條經(jīng)驗(yàn) 獲得超8個(gè)贊

我對(duì) s 沒有太多(當(dāng)然沒有!)經(jīng)驗(yàn)CompletableFuture,但我確實(shí)有一個(gè)建議(可能有幫助?)你可以在CompletableFuture.allOf(elements.stream().maptry 塊外部聲明 lambda 嗎?這樣,在嘗試內(nèi)部之前,所有期貨都不會(huì)運(yùn)行。但它們?nèi)匀豢梢员?catch 塊訪問。在其中您可以完成cancel所有這些。



查看完整回答
反對(duì) 回復(fù) 2023-07-19
?
至尊寶的傳說

TA貢獻(xiàn)1789條經(jīng)驗(yàn) 獲得超10個(gè)贊

您應(yīng)該做的主要事情是interrupt希望更快地終止所有正在運(yùn)行的任務(wù),這意味著這些任務(wù)可能需要檢查中斷,以便它們知道停止正在做的事情并更快地終止。


此外,您可以在主線程中繼續(xù)并讓它們?cè)诤笈_(tái)終止,而不是等待被中斷的任務(wù)實(shí)際終止。


public static void main(String[] args) {

    List<Integer> elements = Arrays.asList(5, null, 6, 3, 4); // these elements will fail fast

    // List<Integer> elements = Arrays.asList(5, 2, 6, 3, 4); // these elements will succeed


    try {

        CountDownLatch latch = new CountDownLatch(elements.size());

        ExecutorService executor = Executors.newFixedThreadPool(elements.size());

        elements.stream().forEach(e -> {

            executor.execute(() -> {

                try {

                    doSomething(e);

                    latch.countDown();

                } catch (Exception ex) {

                    // shutdown executor ASAP on exception, read the docs for `shutdownNow()`

                    // it will interrupt all tasks in the executor

                    if (!executor.isShutdown()) {

                        executor.shutdownNow();

                    }

                    for (int i = (int) latch.getCount(); i >= 0; i--) {

                        latch.countDown();

                    }

                    // log the exception

                    ex.printStackTrace(System.out);

                }

            });

        });

        latch.await();

        if (executor.isShutdown()) {

            System.out.println("Tasks failed! Terminating remaining tasks in the background.");

        } else {

            executor.shutdown();

            System.out.println("Tasks succeeded!");

        }

    } catch (InterruptedException e) {

        e.printStackTrace();

    }

}


public static void doSomething(Integer sleepSecs) {

    // You will want to check for `interrupted()` throughout the method you want to be able to cancel

    if (Thread.interrupted()) {

        System.out.println(Thread.currentThread().getName() + " interrupted early");

        return;

    }


    if (sleepSecs == null) {

        System.out.println(Thread.currentThread().getName() + " throwing exception ");

        throw new RuntimeException();

    }


    try {

        System.out.println(Thread.currentThread().getName() + " started interruptable sleep for " + sleepSecs + "s");

        Thread.sleep(sleepSecs * 1000);

        System.out.println(Thread.currentThread().getName() + " finished interruptable sleep" + sleepSecs + "s");

    } catch (InterruptedException e) {

        System.out.println(Thread.currentThread().getName() + " interrupted sleep!");

    }


    // ...possibly some part of the task that can't be skipped, such as cleanup


    System.out.println(Thread.currentThread().getName() + " complete!");

}


查看完整回答
反對(duì) 回復(fù) 2023-07-19
  • 3 回答
  • 0 關(guān)注
  • 168 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)