3 回答

TA貢獻(xiàn)1995條經(jīng)驗(yàn) 獲得超2個(gè)贊
使用Flowable::blockingSubscribe()
- 將當(dāng)前 Flowable 運(yùn)行到終端事件,忽略任何值并重新拋出任何異常。

TA貢獻(xiàn)1799條經(jīng)驗(yàn) 獲得超8個(gè)贊
由于 RxJava 是異步的,observable 下面的 java 代碼將運(yùn)行,而 observable 將在不同的線程中運(yùn)行,這就是為什么如果您想在 Flowable 已完成發(fā)送數(shù)據(jù)時(shí)收到通知,您應(yīng)該在 RxJava 流中執(zhí)行此操作。為此,您有一個(gè)運(yùn)算符 .doOnComplete 這里有一個(gè)示例如何檢測(cè)流何時(shí)完成
Flowable.range(0, 100).parallel()
.runOn(Schedulers.computation())
.map(integer -> {
return integer;
})
.sequential()
.doOnComplete(() -> {
System.out.println("finished");
})
.subscribe(integer -> System.out.println(integer));

TA貢獻(xiàn)1863條經(jīng)驗(yàn) 獲得超2個(gè)贊
您可以使用 AtomicBoolean,將其初始化為 false 并使用 將其設(shè)置為 true doFinally()。
doFinally()在 Observable 發(fā)出 onError 或 onCompleted 信號(hào)后調(diào)用,或者被下游處理。
然后讓主線程休眠,直到completedvalue 為 true。
使用你的例子:
AtomicBoolean completed = new AtomicBoolean(false);
myFlowable.parallel()
.runOn(Schedulers.computation())
.map(val -> myCollection.add(val))
.sequential()
.doFinally(() -> completed.set(true))
.subscribe(val -> {
...
});
...
try {
while(!completed.get()){
Thread.sleep(1000);
...
}
...
} catch (InterruptedException e) {
e.printStackTrace();
}
添加回答
舉報(bào)