第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在 grpc 中正確設(shè)計發(fā)布-訂閱模式?

如何在 grpc 中正確設(shè)計發(fā)布-訂閱模式?

喵喵時光機 2022-07-14 17:05:20
我正在嘗試使用 grpc 實現(xiàn) pub sub 模式,但我對如何正確執(zhí)行它有點困惑。我的原型: rpc call (google.protobuf.Empty) returns (stream Data);客戶:asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {         @Override         public void onNext(Data value) {           // process a data         @Override         public void onError(Throwable t) {         }         @Override         public void onCompleted() {         }       });   } catch (StatusRuntimeException e) {     LOG.warn("RPC failed: {}", e.getStatus());   }   Thread.currentThread().join();服務(wù)器服務(wù):public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {  private final BlockingQueue<Data> queue;  private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();  public Sender(BlockingQueue<Data> queue) {    this.queue = queue;  }  @Override  public void data(Empty request, StreamObserver<Data> responseObserver) {    observers.add(responseObserver);  }  @Override  public void run() {    while (!Thread.currentThread().isInterrupted()) {      try {        // waiting for first element        Data data = queue.take();        // send head element        observers.forEach(o -> o.onNext(data));      } catch (InterruptedException e) {        LOG.error("error: ", e);        Thread.currentThread().interrupt();      }    }  }}如何正確地從全局觀察者中刪除客戶?連接斷開時如何接收某種信號?如何管理客戶端-服務(wù)器重新連接?連接斷開時如何強制客戶端重新連接?
查看完整描述

1 回答

?
慕虎7371278

TA貢獻1802條經(jīng)驗 獲得超4個贊

在實施您的服務(wù)時:


  @Override

  public void data(Empty request, StreamObserver<Data> responseObserver) {

    observers.add(responseObserver);

  }

您需要獲取當前請求的上下文,并監(jiān)聽取消。對于單請求、多響應(yīng)調(diào)用(又名服務(wù)器流),gRPC 生成的代碼被簡化為直接傳遞請求。這意味著您無法直接訪問底層ServerCall.Listener,這就是您通常如何監(jiān)聽客戶端斷開和取消的方式。


相反,每個 gRPC 調(diào)用都有一個Context與之關(guān)聯(lián)的,它攜帶取消和其他請求范圍的信號。對于您的情況,您只需要通過添加自己的偵聽器來偵聽取消,然后從鏈接的哈希集中安全地刪除響應(yīng)觀察器。


至于重新連接:如果連接斷開,gRPC 客戶端會自動重新連接,但通常不會重試 RPC,除非這樣做是安全的。在服務(wù)器流式 RPC 的情況下,這樣做通常不安全,因此您需要直接在客戶端上重試 RPC。


查看完整回答
反對 回復 2022-07-14
  • 1 回答
  • 0 關(guān)注
  • 237 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號