2 回答

TA貢獻(xiàn)1829條經(jīng)驗(yàn) 獲得超9個(gè)贊
我的解決方案主題是(它可以與 JDK 9+ 一起使用,因?yàn)樽栽摪姹疽詠?lái)公開(kāi)了幾個(gè)可覆蓋的方法)
讓整個(gè)生態(tài)系統(tǒng)了解 MDC
為此,我們需要解決以下場(chǎng)景:
我們什么時(shí)候從這個(gè)類中獲得 CompletableFuture 的新實(shí)例?→ 我們需要返回相同的 MDC 感知版本。
我們什么時(shí)候從這個(gè)類之外獲得 CompletableFuture 的新實(shí)例?→ 我們需要返回相同的 MDC 感知版本。
在 CompletableFuture 類中使用哪個(gè)執(zhí)行器?→ 在任何情況下,我們都需要確保所有執(zhí)行者都了解 MDC
為此,讓我們CompletableFuture
通過(guò)擴(kuò)展它來(lái)創(chuàng)建一個(gè) MDC 感知版本類。我的版本如下所示
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;
public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {
public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();
@Override
public CompletableFuture newIncompleteFuture() {
return new MDCAwareCompletableFuture();
}
@Override
public Executor defaultExecutor() {
return MDC_AWARE_ASYNC_POOL;
}
public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
return new MDCAwareCompletableFuture<>()
.completeAsync(() -> null)
.thenCombineAsync(future, (aVoid, value) -> value);
}
public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
Function<Throwable, T> throwableFunction) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return getMDCAwareCompletionStage(future)
.handle((value, throwable) -> {
setMDCContext(contextMap);
if (throwable != null) {
return throwableFunction.apply(throwable);
}
return value;
});
}
}
該類MDCAwareForkJoinPool看起來(lái)像(ForkJoinTask為簡(jiǎn)單起見(jiàn),跳過(guò)了帶參數(shù)的方法)
public class MDCAwareForkJoinPool extends ForkJoinPool {
//Override constructors which you need
@Override
public <T> ForkJoinTask<T> submit(Callable<T> task) {
return super.submit(MDCUtility.wrapWithMdcContext(task));
}
@Override
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
return super.submit(wrapWithMdcContext(task), result);
}
@Override
public ForkJoinTask<?> submit(Runnable task) {
return super.submit(wrapWithMdcContext(task));
}
@Override
public void execute(Runnable task) {
super.execute(wrapWithMdcContext(task));
}
}
包裝的實(shí)用方法如下
public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
//save the current MDC context
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
setMDCContext(contextMap);
try {
return task.call();
} finally {
// once the task is complete, clear MDC
MDC.clear();
}
};
}
public static Runnable wrapWithMdcContext(Runnable task) {
//save the current MDC context
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
setMDCContext(contextMap);
try {
task.run();
} finally {
// once the task is complete, clear MDC
MDC.clear();
}
};
}
public static void setMDCContext(Map<String, String> contextMap) {
MDC.clear();
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
}
以下是一些使用指南:
使用類
MDCAwareCompletableFuture
而不是類CompletableFuture
。類中的幾個(gè)方法
CompletableFuture
實(shí)例化了 self 版本,例如new CompletableFuture...
. 對(duì)于此類方法(大多數(shù)公共靜態(tài)方法),請(qǐng)使用替代方法來(lái)獲取MDCAwareCompletableFuture
. 使用替代方法的示例可能是,而不是使用CompletableFuture.supplyAsync(...)
,您可以選擇new MDCAwareCompletableFuture<>().completeAsync(...)
當(dāng)您因?yàn)槟硞€(gè)外部庫(kù)返回
CompletableFuture
一個(gè). 顯然,您不能在該庫(kù)中保留上下文,但是在您的代碼命中應(yīng)用程序代碼后,此方法仍會(huì)保留上下文。MDCAwareCompletableFuture
getMDCAwareCompletionStage
CompletableFuture
在提供 executor 作為參數(shù)時(shí),請(qǐng)確保它是 MDC Aware,例如
MDCAwareForkJoinPool
. 您也可以MDCAwareThreadPoolExecutor
通過(guò)覆蓋execute
方法創(chuàng)建以服務(wù)于您的用例。你明白了!
您可以在一篇關(guān)于相同內(nèi)容的帖子中找到上述所有內(nèi)容的詳細(xì)說(shuō)明。
這樣,您的代碼可能看起來(lái)像
new MDCAwareCompletableFuture<>().completeAsync(() -> { getAcountDetails(user); return null; });

TA貢獻(xiàn)1808條經(jīng)驗(yàn) 獲得超4個(gè)贊
創(chuàng)建包裝方法
static CompletableFuture<Void> myMethod(Runnable runnable) {
Map<String, String> previous = MDC.getCopyOfContextMap();
return CompletableFuture.runAsync(() -> {
MDC.setContextMap(previous);
try {
runnable.run();
} finally {
MDC.clear();
}
});
}
并使用它代替CompletableFuture.runAsync.
添加回答
舉報(bào)