在Spring Boot 2 with Reactor中,我試圖合并兩個熱源。但是,似乎唯一一個報告了 中的兩個參數(shù)中的第一個。我如何識別第二個.FluxmergeFluxmergemergeFlux在下面的示例中,in 甚至不會打印 when 是第一個參數(shù)。如果我做第一個,那么不打印。System.errB-2outgoing1aoutgoing2A-2以下是完整的示例;package com.example.demo;import java.time.Duration;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import reactor.core.publisher.Flux;import reactor.core.scheduler.Schedulers;public class Weather {String city;Integer temperature;public Weather(String city, Integer temperature) { this.city = city; this.temperature = temperature;}@Overridepublic String toString() { return "Weather [city=" + city + ", temperature=" + temperature + "]";}public static void main(String[] args) { BlockingQueue<Weather> queue = new LinkedBlockingQueue<>(); BlockingQueue<Weather> queue2 = new LinkedBlockingQueue<>(); // Assume Spring @Repository "A-1" new Thread(() -> { for (int d = 1; d < 1000; d += 1) { for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) { queue.add(new Weather(s, d)); try { Thread.sleep(250); } catch (InterruptedException e) {} } } }).start(); // Assume Spring @Repository "B-1" new Thread(() -> { for (int d = 1; d < 1000; d += 1) { for (String s: new String[] {"MOS", "TLV"}) { queue2.add(new Weather(s, d)); try { Thread.sleep(1000); } catch (InterruptedException e) {} } } }).start(); // Assume Spring @Service "A-2" = real-time LDN, NYC, PAR, ZUR Flux<Weather> outgoing1 = Flux.<Weather>create( sink -> { for (int i = 0; i < 1000; i++) { try { sink.next(queue.take()); System.err.println("1 " + queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } sink.complete(); } )
1 回答

阿晨1998
TA貢獻2037條經(jīng)驗 獲得超6個贊
這里有一些事情在起作用。
請注意
.merge
運算符的以下建議...
請注意,合并是為使用異步源或有限源而定制的。當處理尚未在專用計劃程序上發(fā)布的無限源時,您必須將該源隔離在其自己的計劃程序中,否則合并會嘗試在訂閱另一個源之前將其排出。
您的出站助焊劑使用 ,但這只影響在運算符之后鏈接的運算符。即,它不會影響之前的任何內(nèi)容。具體來說,它不會影響 lambda 中的代碼傳遞到執(zhí)行的線程。如果您在每個出站通量之前添加,您可以看到這一點。
.publishOn
.publishOn
.publishOn
Flux.create
.log()
.publishOn
您的 lambda 已傳遞給調(diào)用阻塞方法 ()。
Flux.create
queue.take
由于您在線程中調(diào)用合并的 Flux,因此您的 lambda 將傳遞給線程中的執(zhí)行,并阻止它。subscribe(...)
main
Flux.create
main
最簡單的解決方法是使用而不是使 lambda 中的代碼傳遞到不同的線程(不是 )上運行。這將防止線程阻塞,并允許來自兩個出站流的合并輸出交錯。.subscribeOn
.publishOn
Flux.create
main
main
添加回答
舉報
0/150
提交
取消