3 回答

TA貢獻(xiàn)1783條經(jīng)驗(yàn) 獲得超4個(gè)贊
相反,它似乎在返回之前等待 map 方法在大多數(shù)元素上完成。
這是不正確的。
當(dāng)談到已經(jīng)被處理的元素時(shí),它將等待所有元素的完成,因?yàn)?Stream API 允許并發(fā)處理本質(zhì)上不是線(xiàn)程安全的數(shù)據(jù)結(jié)構(gòu)。在從終端操作返回之前,它必須確保所有潛在的并發(fā)訪(fǎng)問(wèn)都已完成。
在談?wù)撜麄€(gè)流時(shí),在 8 核機(jī)器上測(cè)試只有 14 個(gè)元素的流是不公平的。當(dāng)然,至少會(huì)有 8 個(gè)并發(fā)操作開(kāi)始,這就是全部。您正在通過(guò)使用findFirst()而不是為火焰添加燃料findAny(),因?yàn)檫@并不意味著按處理順序返回第一個(gè)找到的元素,而是按遇到順序返回第一個(gè)元素,即在您的示例中恰好為零,因此線(xiàn)程處理除第一個(gè)塊之外的其他塊不能假設(shè)他們的結(jié)果是正確答案,并且比使用 更愿意幫助處理其他候選人findAny()。
當(dāng)你使用
List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
})
.filter(n -> n < 40_000)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
盡管流元素的數(shù)量要多得多,但您將獲得相似數(shù)量的任務(wù)運(yùn)行完成。
請(qǐng)注意,CompletableFuture它也不支持中斷,因此我想到的唯一用于返回任何結(jié)果和取消其他作業(yè)的內(nèi)置功能是舊的ExecutorService.invokeAny.
要為其構(gòu)建映射和過(guò)濾功能,我們可以使用以下輔助函數(shù):
static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {
return () -> {
R r = f.apply(t);
if(!p.test(r)) throw new NoSuchElementException();
return r;
};
}
不幸的是,只有使用值或異常完成的選項(xiàng),因此我們必須對(duì)不匹配的元素使用異常。
然后我們可以像這樣使用它
ExecutorService es = ForkJoinPool.commonPool();
Integer result = es.invokeAny(IntStream.range(0, 100)
.mapToObj(i -> mapAndfilter(i,
n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
},
n -> n < 10_000))
.collect(Collectors.toList()));
log("result: "+result);
它不僅會(huì)取消掛起的任務(wù),還會(huì)在不等待它們完成的情況下返回。
當(dāng)然,這意味著操作作業(yè)的源數(shù)據(jù)必須是不可變的或線(xiàn)程安全的。

TA貢獻(xiàn)1785條經(jīng)驗(yàn) 獲得超4個(gè)贊
您可以使用此代碼來(lái)說(shuō)明 parallelStream 的工作原理:
final List<String> list = Arrays.asList("first", "second", "third", "4th", "5th", "7th", "8th", "9th", "10th", "11th", "12th", "13th");
String result = list.parallelStream()
.map(s -> {
System.out.println("map: " + s);
return s;
})
.filter(s -> {
System.out.println("fiter: " + s);
return s.equals("8th");
})
.findFirst()
.orElse(null);
System.out.println("result=" + result);
有兩種選擇可以實(shí)現(xiàn)您的目標(biāo),用過(guò)濾器停止昂貴的操作:
根本不要使用流,使用簡(jiǎn)單的 for 或增強(qiáng)的 for
先過(guò)濾,然后用昂貴的操作映射
添加回答
舉報(bào)