3 回答

TA貢獻1783條經(jīng)驗 獲得超4個贊
相反,它似乎在返回之前等待 map 方法在大多數(shù)元素上完成。
這是不正確的。
當談到已經(jīng)被處理的元素時,它將等待所有元素的完成,因為 Stream API 允許并發(fā)處理本質(zhì)上不是線程安全的數(shù)據(jù)結構。在從終端操作返回之前,它必須確保所有潛在的并發(fā)訪問都已完成。
在談論整個流時,在 8 核機器上測試只有 14 個元素的流是不公平的。當然,至少會有 8 個并發(fā)操作開始,這就是全部。您正在通過使用findFirst()而不是為火焰添加燃料findAny(),因為這并不意味著按處理順序返回第一個找到的元素,而是按遇到順序返回第一個元素,即在您的示例中恰好為零,因此線程處理除第一個塊之外的其他塊不能假設他們的結果是正確答案,并且比使用 更愿意幫助處理其他候選人findAny()。
當你使用
List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
})
.filter(n -> n < 40_000)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
盡管流元素的數(shù)量要多得多,但您將獲得相似數(shù)量的任務運行完成。
請注意,CompletableFuture它也不支持中斷,因此我想到的唯一用于返回任何結果和取消其他作業(yè)的內(nèi)置功能是舊的ExecutorService.invokeAny.
要為其構建映射和過濾功能,我們可以使用以下輔助函數(shù):
static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {
return () -> {
R r = f.apply(t);
if(!p.test(r)) throw new NoSuchElementException();
return r;
};
}
不幸的是,只有使用值或異常完成的選項,因此我們必須對不匹配的元素使用異常。
然后我們可以像這樣使用它
ExecutorService es = ForkJoinPool.commonPool();
Integer result = es.invokeAny(IntStream.range(0, 100)
.mapToObj(i -> mapAndfilter(i,
n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
},
n -> n < 10_000))
.collect(Collectors.toList()));
log("result: "+result);
它不僅會取消掛起的任務,還會在不等待它們完成的情況下返回。
當然,這意味著操作作業(yè)的源數(shù)據(jù)必須是不可變的或線程安全的。

TA貢獻1785條經(jīng)驗 獲得超4個贊
您可以使用此代碼來說明 parallelStream 的工作原理:
final List<String> list = Arrays.asList("first", "second", "third", "4th", "5th", "7th", "8th", "9th", "10th", "11th", "12th", "13th");
String result = list.parallelStream()
.map(s -> {
System.out.println("map: " + s);
return s;
})
.filter(s -> {
System.out.println("fiter: " + s);
return s.equals("8th");
})
.findFirst()
.orElse(null);
System.out.println("result=" + result);
有兩種選擇可以實現(xiàn)您的目標,用過濾器停止昂貴的操作:
根本不要使用流,使用簡單的 for 或增強的 for
先過濾,然后用昂貴的操作映射
添加回答
舉報