我想創(chuàng)建一個(gè)Flux具有按需生成的元素且預(yù)取有限的元素。我嘗試了以下操作,但看起來這段代碼無法處理背壓,因?yàn)樗黦enerateElements變得非常大 (1011):AtomicInteger generateElements = new AtomicInteger(0);Flux<Integer> source = Flux .create(emitter -> { while (true) emitter.next(generateElements.getAndIncrement()); }) .subsribeOn(Schedulers.elastic()) .limitRate(1);source.take(4).subsribe(System.out::println);assertThat(generateElements.get()).isEqualTo(5);我怎樣才能使我的Flux預(yù)取僅限于一次?
1 回答

森欄
TA貢獻(xiàn)1810條經(jīng)驗(yàn) 獲得超5個(gè)贊
您可以Flux.generate
在定義單個(gè)項(xiàng)目發(fā)射的地方使用 which expects a callable:
AtomicInteger?generateElements?=?new?AtomicInteger(0); Flux.generate(emitter?->?emitter.next(generateElements.getAndIncrement())) ????.subscribeOn(Schedulers.elastic()) ????.take(4) ????.subscribe(System.out::println);
添加回答
舉報(bào)
0/150
提交
取消