第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

當我從 ReplaySubject 使用 Observable 時阻止

當我從 ReplaySubject 使用 Observable 時阻止

慕仙森 2023-05-17 17:56:28
我正在編寫客戶端-服務器應用程序。我從數(shù)據(jù)庫中獲取數(shù)據(jù)并將其放入 rxjava2 的 ReplaySubject(ReplaySubject 是必需的,因為我需要保證每個客戶端上的數(shù)據(jù)相同)當客戶端連接訂閱它時,我想將此數(shù)據(jù)發(fā)送給他但是當我嘗試它時我的頭“可能的方式^_^”它阻止了。通過塊我的意思是它不發(fā)送數(shù)據(jù)但是當我關閉服務器數(shù)據(jù)時立即顯示在客戶端。我嘗試在客戶端和服務器端事件循環(huán)中添加一些線程(我在想可能是線程塊,因為我使用“無限”源所以接收這個我需要另一個線程或類似的東西)。服務器端通道代碼:public    class ClientHandler        extends SimpleChannelInboundHandler<DataWrapper> {    private final Observable<DataWrapper> data;    public ClientHandler(Observable<DataWrapper> data) {        this.data = data;    }    @Override    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {        // super.channelRegistered(ctx);        final Channel channel = ctx.channel();        Server            .INSTANCE            .appendToChannelGroup(channel);    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        // super.channelActive(ctx);        // i believe there is something wrong        data.subscribe(ctx::writeAndFlush);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.flush();    }    // rest skip}客戶端:public    class DirectNetworkCommunicator        extends SimpleChannelInboundHandler<DataWrapper> {    private Observable<DataWrapper> generatedData;    private ExecutorService fallbackThread;    DirectNetworkCommunicator(Observable<DataWrapper> generatedData) {        this.fallbackThread = Executors.newSingleThreadExecutor();        this.generatedData = generatedData;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        // super.channelRead(ctx, msg);        DataWrapper inComingData = (DataWrapper) msg;        Adapter            .INSTANCE            .appendFromNettworkData(inComingData);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        // super.channelReadComplete(ctx);        ctx.flush();    }    // rest skip}所以我之前提到過我希望它在服務器關閉時接收數(shù)據(jù),而不是在服務器關閉時接收數(shù)據(jù) ^_^。如果那會幫助 netty 版本 4.1.37 final。
查看完整描述

1 回答

?
汪汪一只貓

TA貢獻1898條經(jīng)驗 獲得超8個贊

好的,所以未來的人們會面臨同樣的問題,我自己找到了答案。來自客戶端的 Netty 使用后臺線程作為通信的主要線程,這意味著我要等待主線程釋放,然后它才能對 observable 進行操作。希望它能幫助別人。



查看完整回答
反對 回復 2023-05-17
  • 1 回答
  • 0 關注
  • 146 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網(wǎng)微信公眾號