我正在編寫客戶端-服務器應用程序。我從數(shù)據(jù)庫中獲取數(shù)據(jù)并將其放入 rxjava2 的 ReplaySubject(ReplaySubject 是必需的,因為我需要保證每個客戶端上的數(shù)據(jù)相同)當客戶端連接訂閱它時,我想將此數(shù)據(jù)發(fā)送給他但是當我嘗試它時我的頭“可能的方式^_^”它阻止了。通過塊我的意思是它不發(fā)送數(shù)據(jù)但是當我關閉服務器數(shù)據(jù)時立即顯示在客戶端。我嘗試在客戶端和服務器端事件循環(huán)中添加一些線程(我在想可能是線程塊,因為我使用“無限”源所以接收這個我需要另一個線程或類似的東西)。服務器端通道代碼:public class ClientHandler extends SimpleChannelInboundHandler<DataWrapper> { private final Observable<DataWrapper> data; public ClientHandler(Observable<DataWrapper> data) { this.data = data; } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { // super.channelRegistered(ctx); final Channel channel = ctx.channel(); Server .INSTANCE .appendToChannelGroup(channel); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // super.channelActive(ctx); // i believe there is something wrong data.subscribe(ctx::writeAndFlush); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } // rest skip}客戶端:public class DirectNetworkCommunicator extends SimpleChannelInboundHandler<DataWrapper> { private Observable<DataWrapper> generatedData; private ExecutorService fallbackThread; DirectNetworkCommunicator(Observable<DataWrapper> generatedData) { this.fallbackThread = Executors.newSingleThreadExecutor(); this.generatedData = generatedData; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // super.channelRead(ctx, msg); DataWrapper inComingData = (DataWrapper) msg; Adapter .INSTANCE .appendFromNettworkData(inComingData); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // super.channelReadComplete(ctx); ctx.flush(); } // rest skip}所以我之前提到過我希望它在服務器關閉時接收數(shù)據(jù),而不是在服務器關閉時接收數(shù)據(jù) ^_^。如果那會幫助 netty 版本 4.1.37 final。
1 回答

汪汪一只貓
TA貢獻1898條經(jīng)驗 獲得超8個贊
好的,所以未來的人們會面臨同樣的問題,我自己找到了答案。來自客戶端的 Netty 使用后臺線程作為通信的主要線程,這意味著我要等待主線程釋放,然后它才能對 observable 進行操作。希望它能幫助別人。
添加回答
舉報
0/150
提交
取消