假設(shè)我有這個(gè):Flux<GroupedFlux<Integer, Integer>> intsGrouped = Flux.range(0, 12) .groupBy(i -> i % 3);并說(shuō)我有一個(gè)方法:Mono<Integer> getFromService(Integer i);我想為每個(gè)組并行呼叫,但請(qǐng)確保每個(gè)組中的呼叫是串行的。getFromService對(duì)于上面的示例,這將是具有這些輸入值的三個(gè)并行流:stream 1: 0 -> 3 -> 6 -> 9stream 2: 1 -> 4 -> 7 -> 10stream 3: 2 -> 5 -> 8 -> 11我試過(guò)這個(gè),但它沒(méi)有做我想做的事:Flux.range(0, 12) .groupBy(i -> i % 3) .flatMap(g -> g.flatMap(i -> getFromService(g.key(), i)))這是一次并行調(diào)用所有整數(shù)的服務(wù)。我該如何繼續(xù)?
1 回答

慕斯709654
TA貢獻(xiàn)1840條經(jīng)驗(yàn) 獲得超5個(gè)贊
使用“連接映射”
或“平面映射”序列性
而不是內(nèi)部.flatMap
如果要在每個(gè)組中按順序執(zhí)行(即每個(gè)組中一次只有一個(gè)訂閱),請(qǐng)使用 ,如下所示:getFromService
.concatMap
Flux.range(0, 12) .groupBy(i -> i % 3) .flatMap(g -> g.concatMap(i -> getFromService(g.key(), i)))
如果組內(nèi)的并行執(zhí)行是可以的,但您只關(guān)心序列的發(fā)出順序,則使用 ,如下所示:flatMapSequential
Flux.range(0, 12) .groupBy(i -> i % 3) .flatMap(g -> g.flatMapSequential(i -> getFromService(g.key(), i)))
另一種選擇是將參數(shù)設(shè)置為 使用,但我建議使用上述方法之一。.flatMap
concurrency
1
添加回答
舉報(bào)
0/150
提交
取消