第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

每當(dāng)它們準(zhǔn)備好而不是間隔輪詢狀態(tài)時,如何使用響應(yīng)式 Flux/Mono 將消息推送到上游?

每當(dāng)它們準(zhǔn)備好而不是間隔輪詢狀態(tài)時,如何使用響應(yīng)式 Flux/Mono 將消息推送到上游?

溫溫醬 2022-04-28 17:00:21
嘗試將消息推送到上游,只要它們可用/準(zhǔn)備好并在刷新后關(guān)閉連接,而不是使用彈簧反應(yīng)通量間隔輪詢消息。@GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {return Flux.<String>interval(Duration.ofSeconds(3))                .map(status -> {                    if (getSomething(randomId).                            equalsIgnoreCase("value"))                        return "value";                    return "ping";                }).take(Duration.ofSeconds(60)).timeout(Duration.ofSeconds(60));    }Kafka 偵聽器在獲取時更新 map 中的 randomId 值,getSomething 方法檢查 map 中間隔中的 randomId 值。因此,我不想檢查間隔并將數(shù)據(jù)存儲在地圖中,而是希望在偵聽器接收到時將消息推送到客戶端。
查看完整描述

2 回答

?
ibeautiful

TA貢獻(xiàn)1993條經(jīng)驗(yàn) 獲得超6個贊

這聽起來像一個Flux.create()請求:


return Flux.<String>create(emitter -> {

     if (getSomething(randomId).equalsIgnoreCase("value")) {

          sink.next("value");

     }

     else {

          sink.next("ping");

     }

  });

/**

 * Programmatically create a {@link Flux} with the capability of emitting multiple

 * elements in a synchronous or asynchronous manner through the {@link FluxSink} API.

 * This includes emitting elements from multiple threads.

 * <p>

 * <img class="marble" src="doc-files/marbles/createForFlux.svg" alt="">

 * <p>

 * This Flux factory is useful if one wants to adapt some other multi-valued async API

 * and not worry about cancellation and backpressure (which is handled by buffering

 * all signals if the downstream can't keep up).

 * <p>

 * For example:

 *

 * <pre><code>

 * Flux.&lt;String&gt;create(emitter -&gt; {

 *

 *     ActionListener al = e -&gt; {

 *         emitter.next(textField.getText());

 *     };

 *     // without cleanup support:

 *

 *     button.addActionListener(al);

 *

 *     // with cleanup support:

 *

 *     button.addActionListener(al);

 *     emitter.onDispose(() -> {

 *         button.removeListener(al);

 *     });

 * });

 * </code></pre>

 *

 * @reactor.discard The {@link FluxSink} exposed by this operator buffers in case of

 * overflow. The buffer is discarded when the main sequence is cancelled.

 *

 * @param <T> The type of values in the sequence

 * @param emitter Consume the {@link FluxSink} provided per-subscriber by Reactor to generate signals.

 * @return a {@link Flux}

 * @see #push(Consumer)

 */

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {


查看完整回答
反對 回復(fù) 2022-04-28
?
躍然一笑

TA貢獻(xiàn)1826條經(jīng)驗(yàn) 獲得超6個贊

我基于這個 stackoverflow Spring 5 Web Reactive - Hot Publishing - How to use EmitterProcessor to bridge a MessageListener to an event stream answer構(gòu)建了解決方案,使用 EmitterProcessor 在消息可用時熱發(fā)布消息。


這是示例代碼


@GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {

    EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();

    Flux<String> autoConnect = emitterProcessor.publish().autoConnect();

    FluxSink<String> sink = emitterProcessor.sink();

    //storing randomId and processor sink details

    randomIdMap.putIfAbsent(randomId, emitterProcessor);

    /** This will return ping status to notify client as 

    connection is alive until the randomId message received. **/

    sendPingStatus(sink, randomId);

}

下面的方法顯示了如何在消息到達(dá) kafka 消費(fèi)者并關(guān)閉通量連接時將消息推送到客戶端。


@KafkaListener(topics = "some-subscription-id",

        containerFactory = "kafkaListenerContainerFactory")

public void pushMessage(SomeMessage message, Acknowledgment acknowledgment) {

    EmitterProcessor emitter = randomIdMap.get("randomId");

    if (emitter != null ) {

        emitter.onNext(message);

        emitter.onComplete();

        randomIdMap.remove("randomId");

        acknowledgment.acknowledge();

    }

}


查看完整回答
反對 回復(fù) 2022-04-28
  • 2 回答
  • 0 關(guān)注
  • 173 瀏覽

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號