我正在嘗試使用 grpc 實現(xiàn) pub sub 模式,但我對如何正確執(zhí)行它有點困惑。我的原型: rpc call (google.protobuf.Empty) returns (stream Data);客戶:asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() { @Override public void onNext(Data value) { // process a data @Override public void onError(Throwable t) { } @Override public void onCompleted() { } }); } catch (StatusRuntimeException e) { LOG.warn("RPC failed: {}", e.getStatus()); } Thread.currentThread().join();服務(wù)器服務(wù):public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable { private final BlockingQueue<Data> queue; private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>(); public Sender(BlockingQueue<Data> queue) { this.queue = queue; } @Override public void data(Empty request, StreamObserver<Data> responseObserver) { observers.add(responseObserver); } @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { // waiting for first element Data data = queue.take(); // send head element observers.forEach(o -> o.onNext(data)); } catch (InterruptedException e) { LOG.error("error: ", e); Thread.currentThread().interrupt(); } } }}如何正確地從全局觀察者中刪除客戶?連接斷開時如何接收某種信號?如何管理客戶端-服務(wù)器重新連接?連接斷開時如何強制客戶端重新連接?
1 回答

慕虎7371278
TA貢獻1802條經(jīng)驗 獲得超4個贊
在實施您的服務(wù)時:
@Override
public void data(Empty request, StreamObserver<Data> responseObserver) {
observers.add(responseObserver);
}
您需要獲取當前請求的上下文,并監(jiān)聽取消。對于單請求、多響應(yīng)調(diào)用(又名服務(wù)器流),gRPC 生成的代碼被簡化為直接傳遞請求。這意味著您無法直接訪問底層ServerCall.Listener,這就是您通常如何監(jiān)聽客戶端斷開和取消的方式。
相反,每個 gRPC 調(diào)用都有一個Context與之關(guān)聯(lián)的,它攜帶取消和其他請求范圍的信號。對于您的情況,您只需要通過添加自己的偵聽器來偵聽取消,然后從鏈接的哈希集中安全地刪除響應(yīng)觀察器。
至于重新連接:如果連接斷開,gRPC 客戶端會自動重新連接,但通常不會重試 RPC,除非這樣做是安全的。在服務(wù)器流式 RPC 的情況下,這樣做通常不安全,因此您需要直接在客戶端上重試 RPC。
添加回答
舉報
0/150
提交
取消