3 回答

TA貢獻(xiàn)1794條經(jīng)驗(yàn) 獲得超7個(gè)贊
發(fā)生異常時(shí)全部取消即可。障礙在于您在創(chuàng)建它們時(shí)并不了解所有這些,并且您不想多次執(zhí)行此工作。這可以通過創(chuàng)建一個(gè)新的、空的CompletableFuture第一個(gè)(我們稱之為f1)來解決。然后,像以前一樣創(chuàng)建 future,但f1.cancel在if(shouldStop()) { … }語句中插入對(duì) 的調(diào)用。然后,在創(chuàng)建所有 future 后,將一個(gè)操作鏈接起來,將所有 future 取消f1。
取消將達(dá)到兩個(gè)目的,它將阻止尚未開始的可運(yùn)行對(duì)象的執(zhí)行,并且將使未來通過不allOf等待仍在進(jìn)行的評(píng)估完成來返回。
由于取消 aCompletableFuture與使用 a 異常完成它沒有什么不同CancellationException,并且在出現(xiàn)多個(gè)異常的情況下,由 返回的 futureallOf將報(bào)告任意一個(gè),我們可以使用自completeExceptionally定義來MyException代替,以確保報(bào)告的異常不會(huì)是次要的CancellationException。
一個(gè)獨(dú)立的例子是:
static final AtomicInteger STOP = new AtomicInteger(2);
static boolean shouldStop() {
return STOP.getAndDecrement() <= 0;
}
static final int N = 10;
public static void main(String[] args) {
Set<Integer> elements = IntStream.range(0, 100).boxed().collect(Collectors.toSet());
ExecutorService executor = Executors.newFixedThreadPool(N);
try {
CompletableFuture<?> cancelAll = new CompletableFuture<>();
CompletableFuture<?>[] all = elements.stream()
.map(e ->
CompletableFuture.runAsync(() -> {
System.out.println("entered "+e);
if(shouldStop()) {
RuntimeException myException = new RuntimeException("stopped");
// alternatively cancelAll.cancel(false);
cancelAll.completeExceptionally(myException);
throw myException;
}
System.out.println("processing "+e);
}, executor))
.toArray(CompletableFuture<?>[]::new);
cancelAll.whenComplete((value,throwable) -> {
if(throwable != null) {
for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);
}
});
CompletableFuture.allOf(all).join();
} catch (CompletionException e) {
e.printStackTrace();
}
executor.shutdown();
}
這會(huì)打印類似的東西
entered 3
entered 8
entered 4
entered 6
entered 1
entered 9
entered 0
entered 7
entered 5
entered 2
entered 10
processing 8
processing 3
java.util.concurrent.CompletionException: java.lang.RuntimeException: stopped
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1423)
at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1144)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at CompletableFutureTest.lambda$main$3(CompletableFutureTest.java:34)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:26)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: stopped
at CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:25)
... 4 more
顯示由于并發(fā)性,一些可運(yùn)行對(duì)象已經(jīng)在運(yùn)行,但一旦傳播取消,就不會(huì)啟動(dòng)后續(xù)執(zhí)行。
請(qǐng)注意,由于cancelAll只會(huì)在異常情況下完成或根本不會(huì)完成,cancelAll.whenComplete((value,throwable) -> { for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable); });因此您可以將鏈接操作簡(jiǎn)化為,但這只是編碼風(fēng)格是否保留冗余檢查的問題。
您還可以向處理步驟添加延遲,以確保allOf(all).join()在滿足停止條件時(shí)不會(huì)等待完成。
還可以將一個(gè)操作鏈接到返回的 future,runAsync該操作將在任何異常完成時(shí)取消所有操作,而不僅僅是顯式停止。但是,必須注意返回表示通過 安排的操作的原始未來,runAsync而不是返回的未來whenComplete。
CompletableFuture<?> cancelAll = new CompletableFuture<>();
CompletableFuture<?>[] all = elements.stream()
.map(e -> {
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
System.out.println("entered "+e);
if(shouldStop()) throw new RuntimeException("stopped");
System.out.println("processing "+e);
}, executor);
cf.whenComplete((value,throwable) -> {
if(throwable != null) cancelAll.completeExceptionally(throwable);
});
return cf;
})
.toArray(CompletableFuture<?>[]::new);
cancelAll.whenComplete((value,throwable) -> {
for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);
});
CompletableFuture.allOf(all).join();

TA貢獻(xiàn)1784條經(jīng)驗(yàn) 獲得超8個(gè)贊
我對(duì) s 沒有太多(當(dāng)然沒有!)經(jīng)驗(yàn)CompletableFuture
,但我確實(shí)有一個(gè)建議(可能有幫助?)你可以在CompletableFuture.allOf(elements.stream().map
try 塊外部聲明 lambda 嗎?這樣,在嘗試內(nèi)部之前,所有期貨都不會(huì)運(yùn)行。但它們?nèi)匀豢梢员?catch 塊訪問。在其中您可以完成cancel
所有這些。

TA貢獻(xiàn)1789條經(jīng)驗(yàn) 獲得超10個(gè)贊
您應(yīng)該做的主要事情是interrupt希望更快地終止所有正在運(yùn)行的任務(wù),這意味著這些任務(wù)可能需要檢查中斷,以便它們知道停止正在做的事情并更快地終止。
此外,您可以在主線程中繼續(xù)并讓它們?cè)诤笈_(tái)終止,而不是等待被中斷的任務(wù)實(shí)際終止。
public static void main(String[] args) {
List<Integer> elements = Arrays.asList(5, null, 6, 3, 4); // these elements will fail fast
// List<Integer> elements = Arrays.asList(5, 2, 6, 3, 4); // these elements will succeed
try {
CountDownLatch latch = new CountDownLatch(elements.size());
ExecutorService executor = Executors.newFixedThreadPool(elements.size());
elements.stream().forEach(e -> {
executor.execute(() -> {
try {
doSomething(e);
latch.countDown();
} catch (Exception ex) {
// shutdown executor ASAP on exception, read the docs for `shutdownNow()`
// it will interrupt all tasks in the executor
if (!executor.isShutdown()) {
executor.shutdownNow();
}
for (int i = (int) latch.getCount(); i >= 0; i--) {
latch.countDown();
}
// log the exception
ex.printStackTrace(System.out);
}
});
});
latch.await();
if (executor.isShutdown()) {
System.out.println("Tasks failed! Terminating remaining tasks in the background.");
} else {
executor.shutdown();
System.out.println("Tasks succeeded!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void doSomething(Integer sleepSecs) {
// You will want to check for `interrupted()` throughout the method you want to be able to cancel
if (Thread.interrupted()) {
System.out.println(Thread.currentThread().getName() + " interrupted early");
return;
}
if (sleepSecs == null) {
System.out.println(Thread.currentThread().getName() + " throwing exception ");
throw new RuntimeException();
}
try {
System.out.println(Thread.currentThread().getName() + " started interruptable sleep for " + sleepSecs + "s");
Thread.sleep(sleepSecs * 1000);
System.out.println(Thread.currentThread().getName() + " finished interruptable sleep" + sleepSecs + "s");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " interrupted sleep!");
}
// ...possibly some part of the task that can't be skipped, such as cleanup
System.out.println(Thread.currentThread().getName() + " complete!");
}
添加回答
舉報(bào)