1 回答

TA貢獻(xiàn)1790條經(jīng)驗(yàn) 獲得超9個(gè)贊
如果您可以通過(guò)檢查識(shí)別出最后Item
發(fā)出的信號(hào),則可以使用運(yùn)算符:getNew
.expand
? ?public Flux<Item> observe(long id) {
? ? ? ? return getNew(id)
? ? ? ? ? ? ? ? .expand(item -> isLast(item)
? ? ? ? ? ? ? ? ? ? ? ? ? getNew(item.id)
? ? ? ? ? ? ? ? ? ? ? ? : Flux.empty());
? ? }
? ? /**
? ? ?* @return true if the given item is the last item emitted by getNew
? ? ?*/
? ? private boolean isLast(Item item) {
? ? ? ? return // ... snip ...
? ? }
如果您不能通過(guò)檢查來(lái)識(shí)別最后一個(gè)Item
,那么您將不得不使用狀態(tài)變量。雖然,我建議使用.defer
and.repeat
而不是?.interval
...
?public Flux<Item> observe(long id) {
? ? ? ? final AtomicLong nextStartId = new AtomicLong(id);
? ? ? ? return Flux.defer(() -> getNew(nextStartId.get()))
? ? ? ? ? ? ? ? .doOnNext(item -> nextStartId.set(item.id))
? ? ? ? ? ? ? ? .repeat();
? ? }
反對(duì)使用的主要原因.interval
是:
如果沒(méi)有及時(shí)產(chǎn)生需求,將發(fā)出 onError 信號(hào)
因此,如果 API 花費(fèi)的時(shí)間太長(zhǎng),或者處理結(jié)果的時(shí)間太長(zhǎng),流將以錯(cuò)誤結(jié)束。對(duì)于較長(zhǎng)的間隔,這可能不是問(wèn)題,但對(duì)于相對(duì)較短的間隔(例如您的示例中的 1 秒),這可能是一個(gè)問(wèn)題。
如果你想在每次重復(fù)迭代之前延遲,那么你可以使用.repeatWhen
, 帶有 reactor-extra 的Repeat
固定退避。這將為您提供“固定延遲”語(yǔ)義,而不是“固定間隔”。
添加回答
舉報(bào)