我想創(chuàng)建一個Flux具有按需生成的元素且預取有限的元素。我嘗試了以下操作,但看起來這段代碼無法處理背壓,因為它generateElements變得非常大 (1011):AtomicInteger generateElements = new AtomicInteger(0);Flux<Integer> source = Flux .create(emitter -> { while (true) emitter.next(generateElements.getAndIncrement()); }) .subsribeOn(Schedulers.elastic()) .limitRate(1);source.take(4).subsribe(System.out::println);assertThat(generateElements.get()).isEqualTo(5);我怎樣才能使我的Flux預取僅限于一次?
1 回答

森欄
TA貢獻1810條經驗 獲得超5個贊
您可以Flux.generate
在定義單個項目發(fā)射的地方使用 which expects a callable:
AtomicInteger?generateElements?=?new?AtomicInteger(0); Flux.generate(emitter?->?emitter.next(generateElements.getAndIncrement())) ????.subscribeOn(Schedulers.elastic()) ????.take(4) ????.subscribe(System.out::println);
添加回答
舉報
0/150
提交
取消