第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在 Spring Flux 中并行化數(shù)據(jù)庫查詢?

如何在 Spring Flux 中并行化數(shù)據(jù)庫查詢?

qq_花開花謝_0 2023-04-19 16:07:42
我想在 Spring 中mysql使用Flux<JSONObject>流公開來自數(shù)據(jù)庫的聚合結(jié)果。@RestControllerpublic class FluxController {     @GetMapping("/", produces = TEXT_EVENT_STREAM_VALUE)     public Flux<JSONObject> stream() {          return service.getJson();     }}@Servicepublic class DatabaseService {    public List<JSONObject> getJson() {        List<Long> refs = jdbc.queryForList(...);        MapSqlParameterSource params = new MapSqlParameterSource();        params.addValue("refs", refs);        //of course real world sql is much more complex        List<Long, Product> products = jdbc.query(SELECT * from products where ref IN (:refs), params);        List<Long, Item> items = jdbc.query(SELECT * from items where ref IN (:refs), params);        List<Long, Warehouse> warehouses = jdbc.query(SELECT * from warehouses where ref IN (:refs), params);        List<JSONObject> results = new ArrayList<>();        for (Long ref : refs) {            JSONObject json = new JSONObject();            json.put("ref", ref);            json.put("product", products.get(ref));            json.put("item", items.get(ref));            json.put("warehouse", warehouses.get(ref));            results.add(json);        }        return results;    }現(xiàn)在我想將其轉(zhuǎn)換為通量,將其作為事件流公開。但是我怎樣才能并行化數(shù)據(jù)庫查找并將它鏈接在一起成為一個通量呢?    public Flux<JSONObject> getJsonFlux() {        //I need this as source        List<Long> refs = jdbc.queryForList(...);        return Flux.fromIterable(refs).map(refs -> {            //TODO how to aggregate the different database calls concurrently?            //and then expose each JSONObject one by one into the stream as soon as it is build?        };    }旁注:我知道這仍然會阻塞。但在我的實際應(yīng)用程序中,我正在應(yīng)用分頁和分塊,所以每個塊都會在準備好時暴露給流。然后主要問題是我不知道如何并行化,然后聚合/合并結(jié)果,例如在最后一個通量步驟中。
查看完整描述

2 回答

?
慕工程0101907

TA貢獻1887條經(jīng)驗 獲得超5個贊

這個想法是首先獲取 的完整列表refs,然后同時獲取 Products、Items 和 Warehouses——我稱之為 Tuple3 lookups。然后將每一個ref與組合起來,并一一lookups轉(zhuǎn)換。JSONObject


return Mono.fromCallable(jdbc::queryForList) //fetches refs

? ? ? ? ? ? ? ? .subscribeOn(Schedulers.elastic())

? ? ? ? ? ? ? ? .flatMapMany(refList -> { //flatMapMany allows to convert Mono to Flux in flatMap operation

? ? ? ? ? ? ? ? ? ? ? ? ? ? Flux<Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>>> lookups = Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList))

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .cache().repeat(); //notice cache - it makes sure that Mono.zip is executed only once, not for each zipWith call


? ? ? ? ? ? ? ? ? ? ? ? ? ? return Flux.fromIterable(refList)

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .zipWith(lookups);

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? )

? ? ? ? ? ? ? ? .map(t -> {

? ? ? ? ? ? ? ? ? ? Long ref = t.getT1();

? ? ? ? ? ? ? ? ? ? Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>> lookups = t.getT2();

? ? ? ? ? ? ? ? ? ? JSONObject json = new JSONObject();

? ? ? ? ? ? ? ? ? ? json.put("ref", ref);

? ? ? ? ? ? ? ? ? ? json.put("product", lookups.getT1().get(ref));

? ? ? ? ? ? ? ? ? ? json.put("item", lookups.getT2().get(ref));

? ? ? ? ? ? ? ? ? ? json.put("warehouse", lookups.getT3().get(ref));

? ? ? ? ? ? ? ? ? ? return json;

? ? ? ? ? ? ? ? });

每個數(shù)據(jù)庫調(diào)用的方法:


Mono<Map<Long, Product>> fetchProducts(List<Long> refs) {

? ? return Mono.fromCallable(() -> jdbc.query(SELECT * from products where ref IN(:refs),params))

? ? ? ? .subscribeOn(Schedulers.elastic());

}


Mono<Map<Long, Item>> fetchItems(List<Long> refs) {

? ? return Mono.fromCallable(() -> jdbc.query(SELECT * from items where ref IN(:refs),params))

? ? ? ? .subscribeOn(Schedulers.elastic());

}


Mono<Map<Long, Warehouse>> fetchWarehouses(List<Long> refs) {

? ? return Mono.fromCallable(() -> jdbc.query(SELECT * from warehouses where ref IN(:refs),params))

? ? ? ? .subscribeOn(Schedulers.elastic());

}

為什么我需要訂閱?

我之所以這樣說是因為兩個原因:

  1. 它允許在專用線程池的線程上執(zhí)行數(shù)據(jù)庫查詢,從而防止阻塞主線程

  2. 它允許真正并行化Mono.zip??吹竭@個,它是關(guān)于的flatMap,但它也適用于zip


.flatMap()為了完整起見,在 zip 結(jié)果上使用時也是可能的。雖然我不確定.cache()這里是否還有必要。

? .flatMapMany(refList -> {

? ? ? ? Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList)).cache()

? ? ? ? ? ? .flatMap(tuple -> Flux.fromIterable(refList).map(refId -> Tuples.of(refId, tuple)));

? ? .map(tuple -> {

? ? ? ? String refId = tuple.getT1();

? ? ? ? Tuple lookups = tuple.getT2();

? ? }

})


查看完整回答
反對 回復(fù) 2023-04-19
?
白豬掌柜的

TA貢獻1893條經(jīng)驗 獲得超10個贊

如果我理解得很好,您想通過將所有引用作為參數(shù)傳遞來執(zhí)行查詢。


它不會真正成為一個事件流,因為它會等到所有查詢都完成并且所有 json 對象都在內(nèi)存中,然后才開始流式傳輸它們。


public Flux<JSONObject> getJsonFlux()

{

    return Mono.fromCallable(jdbc::queryForList)

               .subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one

               .flatMap(this::queryEntities)

               .map(this::createJsonObjects)

               .flatMapMany(Flux::fromIterable);

}


private Mono<Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>>> queryEntities(List<Long> refs)

{

    Mono<List<Product>> products = Mono.fromCallable(() -> jdbc.queryProducts(refs)).subscribeOn(Schedulers.elastic());

    Mono<List<Item>> items = Mono.fromCallable(() -> jdbc.queryItems(refs)).subscribeOn(Schedulers.elastic());

    Mono<List<Warehouse>> warehouses = Mono.fromCallable(() -> jdbc.queryWarehouses(refs)).subscribeOn(Schedulers.elastic());


    return Mono.zip(Mono.just(refs), products, items, warehouses); // query calls will be concurrent

}


private List<JSONObject> createJsonObjects(Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>> tuple)

{

    List<Long> refs = tuple.getT1();

    List<Product> products = tuple.getT2();

    List<Item> items = tuple.getT3();

    List<Warehouse> warehouses = tuple.getT4();


    List<JSONObject> jsonObjects = new ArrayList<>();


    for (Long ref : refs)

    {

        JSONObject json = new JSONObject();

        // build json object here


        jsonObjects.add(json);

    }


    return jsonObjects;

}

另一種方法是分別查詢每個引用的實體。這樣每個 JSONObject 都被單獨查詢,并且它們可以在流中交錯。我不確定數(shù)據(jù)庫如何處理這種負載。這是你應(yīng)該考慮的事情。


public Flux<JSONObject> getJsonFlux()

{

    return Mono.fromCallable(jdbc::queryForList)

               .flatMapMany(Flux::fromIterable)

               .subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one

               .flatMap(this::queryEntities)

               .map(this::createJsonObject);

}


private Mono<Tuple4<Long, Product, Item, Warehouse>> queryEntities(Long ref)

{

    Mono<Product> product = Mono.fromCallable(() -> jdbc.queryProduct(ref)).subscribeOn(Schedulers.elastic());

    Mono<Item> item = Mono.fromCallable(() -> jdbc.queryItem(ref)).subscribeOn(Schedulers.elastic());

    Mono<Warehouse> warehouse = Mono.fromCallable(() -> jdbc.queryWarehouse(ref))

                                     .subscribeOn(Schedulers.elastic());


    return Mono.zip(Mono.just(ref), product, item, warehouse); // query calls will be concurrent

}


private JSONObject createJsonObject(Tuple4<Long, Product, Item, Warehouse> tuple)

{

    Long ref = tuple.getT1();

    Product product = tuple.getT2();

    Item item = tuple.getT3();

    Warehouse warehouse = tuple.getT4();


    JSONObject json = new JSONObject();

    // build json object here


    return json;

}


查看完整回答
反對 回復(fù) 2023-04-19
  • 2 回答
  • 0 關(guān)注
  • 209 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號