第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在子服務(wù)器套接字關(guān)閉時使 ZMQ pub 客戶端套接字緩沖消息

如何在子服務(wù)器套接字關(guān)閉時使 ZMQ pub 客戶端套接字緩沖消息

精慕HU 2023-11-10 16:52:39
給定 2 個應(yīng)用程序,其中應(yīng)用程序 A 使用發(fā)布者客戶端將數(shù)據(jù)有爭議地流式傳輸?shù)綉?yīng)用程序 B,應(yīng)用程序 B 有一個子服務(wù)器套接字來接受該數(shù)據(jù),我們?nèi)绾卧趹?yīng)用程序 A 中配置 pub 客戶端套接字,以便當(dāng) B 不可用時(比如它正在運行)重新部署,重新啟動)A 緩沖所有待處理的消息,當(dāng) B 變得可用時,緩沖的消息會通過并且套接字趕上實時流?簡而言之,當(dāng) SUB SERVER 不可用時,我們?nèi)绾问?PUB CLIENT 套接字緩沖消息有一定的限制?PUB 客戶端的默認(rèn)行為是進(jìn)入靜音狀態(tài),但如果我們可以將其更改為限制大小的緩沖區(qū),那就太好了,zmq 可以嗎?或者我需要在應(yīng)用程序級別上執(zhí)行此操作...我已經(jīng)嘗試在我的套接字中設(shè)置 HWM 和 LINGER,但如果我沒有錯的話,它們只負(fù)責(zé)緩慢的消費者情況,其中我的發(fā)布者連接到訂閱者,但訂閱者太慢以至于發(fā)布者開始緩沖消息(hwm 將限制這些消息的數(shù)量)...我正在使用jeromq,因為我的目標(biāo)是 jvm 平臺。
查看完整描述

3 回答

?
ITMISS

TA貢獻(xiàn)1871條經(jīng)驗 獲得超8個贊

自從其他兩個答案(盡管信息非常豐富,實際上是錯誤的)以來,我發(fā)布了一個快速更新,并且我不希望其他人從我接受的答案中得到誤導(dǎo)。您不僅可以使用 zmq 做到這一點,它實際上是默認(rèn)行為。


訣竅是,如果您的發(fā)布者客戶端在不斷丟棄消息之前從未連接到訂閱者服務(wù)器(這就是為什么我認(rèn)為它不緩沖消息),但是如果您的發(fā)布者連接到訂閱者并且您重新啟動訂閱者,則發(fā)布者將緩沖消息直到達(dá)到 HWM,這正是我所要求的...所以簡而言之,發(fā)布者想知道另一端是否有人接受消息,只有在此之后它才會緩沖消息...


這是一些示例代碼,演示了這一點(您可能需要進(jìn)行一些基本編輯才能編譯它)。


我只使用了這個依賴項org.zeromq:jeromq:0.5.1。


zmq-publisher.kt


fun main() {

   val uri = "tcp://localhost:3006"

   val context = ZContext(1)

   val socket = context.createSocket(SocketType.PUB)


   socket.hwm = 10000

   socket.linger = 0

   "connecting to $uri".log()

   socket.connect(uri)


   fun publish(path: String, msg: Msg) {

      ">> $path | ${msg.json()}".log()

      socket.sendMore(path)

      socket.send(msg.toByteArray())

   }


   var count = 0


   while (notInterrupted()) {

      val msg = telegramMessage("message : ${++count}")

      publish("/some/feed", msg)

      println()


      sleepInterruptible(1.second)

   }

}

而且當(dāng)然zmq-subscriber.kt



fun main() {

   val uri = "tcp://localhost:3006"

   val context = ZContext(1)

   val socket = context.createSocket(SocketType.SUB)


   socket.hwm = 10000

   socket.receiveTimeOut = 250


   "connecting to $uri".log()

   socket.bind(uri)


   socket.subscribe("/some/feed")


   while (true) {

      val path = socket.recvStr() ?: continue

      val bytes = socket.recv()

      val msg = Msg.parseFrom(bytes)

      "<< $path | ${msg.json()}".log()

   }

}

嘗試先在沒有訂閱者的情況下運行發(fā)布者,然后當(dāng)您啟動訂閱者時,您錯過了到目前為止的所有消息...現(xiàn)在不重新啟動發(fā)布者,停止訂閱者等待一段時間并再次啟動它。


這是我的一項服務(wù)實際上從中受益的示例......這是結(jié)構(gòu)[current service]sub:server <= pub:client[service being restarted]sub:server <=* pub:client[multiple publishers]


因為我在中間重新啟動服務(wù),所有發(fā)布者開始緩沖他們的消息,每秒觀察約 200 條消息的最終服務(wù)觀察到下降到 0(那些 1 或 2 是心跳),然后突然爆發(fā) 1000 多條消息,因為所有發(fā)布者都刷新了他們的緩沖區(qū)(重新啟動大約需要 5 秒)...實際上我在這里沒有丟失任何消息...

https://img1.sycdn.imooc.com/654def7b0001af1017080577.jpg

請注意,您必須有subscriber:server <= publisher:client一對(這樣發(fā)布者就知道“我需要將這些消息傳遞到一個地方”(您可以嘗試綁定發(fā)布者并連接訂閱者,但您將不會再看到發(fā)布者緩沖消息,因為它是有問題的,如果剛剛斷開連接的訂閱者這樣做是因為它不再需要數(shù)據(jù)或因為它失?。?/p>



查看完整回答
反對 回復(fù) 2023-11-10
?
慕標(biāo)琳琳

TA貢獻(xiàn)1830條經(jīng)驗 獲得超9個贊

首先,歡迎來到 Zen-of-Zero 的世界,其中延遲最為重要

序言:

ZeroMQ 是由 Pieter HINTJENS 的團(tuán)隊設(shè)計的,該團(tuán)隊由經(jīng)驗豐富的大師組成,Martin SUSTRIK 名列第一。該設(shè)計經(jīng)過專業(yè)設(shè)計,以避免任何不必要的延遲。那么詢問是否有(有限的)持久性?不,先生,尚未確認(rèn) -PUB/SUB可擴展的正式通信模式原型不會內(nèi)置它,因為增加了問題并降低了性能和可擴展性(附加延遲、附加處理、附加內(nèi)存管理)。

如果需要(有限的)持久性(對于缺少遠(yuǎn)程 SUB 端代理的連接),請隨意在應(yīng)用程序端實現(xiàn)它,或者可以設(shè)計和實現(xiàn)一種新的 ZMTP 兼容的此類行為模式原型,擴展 ZeroMQ 框架,如果此類工作進(jìn)入穩(wěn)定且公開接受的狀態(tài),但不要求高性能、消除延遲的標(biāo)準(zhǔn)已經(jīng)打磨了PUB/SUB幾乎線性的可擴展性 ad astra,以朝這個方向進(jìn)行修改。這絕對不是一條路可走。

解決方案 ?

應(yīng)用程序端可以使用雙指針循環(huán)緩沖區(qū)輕松實現(xiàn)您添加的邏輯,以某種(應(yīng)用程序端管理)的方式工作-?Persistence-PROXY,但位于PUB發(fā)送方的前面。

如果您的設(shè)計還喜歡使用最近提供的內(nèi)置 ZeroMQ 組件來設(shè)置額外socket_monitor控制層并在那里接收事件流(如圖所示),那么您的設(shè)計可能會成功地從 ZeroMQ 內(nèi)部細(xì)節(jié)中擠出一些額外的醬料從 PUB 端Context實例“內(nèi)部”,其中一些額外的網(wǎng)絡(luò)和連接管理相關(guān)事件可能會給您的(應(yīng)用程序端管理)帶來更多信息-持久性代理

然而,請注意

_zmq_socket_monitor()_方法僅支持面向連接的傳輸,即TCP、IPC和TIPC。

因此,如果計劃使用任何最終有趣的傳輸類,人們可能會直接忘記這一點{ inproc:// | norm:// | pgm:// | epgm:// | vmci:// }


小心 !

“...zmq 優(yōu)化主題發(fā)布?就像如果您持續(xù)topic快速發(fā)布大約 100 個字符長的內(nèi)容,它實際上是topic每次都發(fā)送還是映射到某個 int 并隨后發(fā)送 int...?”

告訴你:

“它總是會發(fā)布topic.當(dāng)我使用該pub-sub模式時,我通常發(fā)布topic第一個消息,然后發(fā)布實際消息,因此在訂閱者中,我只讀取第一幀并忽略它,然后讀取實際消息”

ZeroMQ 不是這樣工作的。沒有什么是“單獨”<topic>后面跟著一個<message-body>,而是相反

主題過濾TOPIC的機械化工作方式非常不同。

1)你永遠(yuǎn)不知道,誰.connect()-s:
即幾乎可以肯定版本 2.x 直到版本 4.2+ 將以不同的方式處理主題過濾(ZMTP:RFC 定義初始功能版本握手,以讓Context-instance決定必須使用哪個版本的主題過濾:
版本 2.x用于將所有消息移動到所有對等方,并讓所有SUB 端(版本 2.x+)傳遞消息(并讓-?SUBside?Context-instance 處理本地topic-list 過濾處理

,而
4.2+ 版本肯定會topic在 **PUB-side?Context-instance 上執(zhí)行 -list 過濾處理(CPU 使用量增長,網(wǎng)絡(luò)傳輸相反),所以你的 SUB-一方永遠(yuǎn)不會傳遞一個字節(jié)的“無用”讀取“未訂閱”的消息。

2)(您可以,但是)不需要將“主題”分離到由此暗示的多幀消息的第一幀中。也許恰恰相反(在高性能、低延遲的分布式系統(tǒng)設(shè)計中這樣做是一種相當(dāng)反模式。

主題過濾過程被定義并按字節(jié)工作,從左到右,每個主題列表成員值與傳遞的消息有效負(fù)載進(jìn)行模式匹配。

添加額外的數(shù)據(jù)、額外的幀管理處理只會增加端到端延遲和處理開銷。這樣做從來都不是一個好主意,而不是正確的分布式系統(tǒng)設(shè)計工作。


尾聲:

職業(yè)比賽中沒有輕而易舉的勝利,也沒有唾手可得的成果分布式系統(tǒng)設(shè)計,如果越少低延遲或超低延遲是設(shè)計目標(biāo)。

另一方面,請確保 ZeroMQ 框架是在考慮到這一點的情況下制定的,并且這些努力以穩(wěn)定的、最終性能良好平衡的工具集為頂峰,這些工具集智能(設(shè)計)、快速(運行)和可擴展(因為地獄可能會這樣)由于這種設(shè)計智慧,人們喜歡使用正確的信號/消息服務(wù)。

希望您對 ZeroMQ 感到滿意,并隨意在您選擇的應(yīng)用程序套件內(nèi)的 ZeroMQ 層“前面”添加任何附加功能集。


查看完整回答
反對 回復(fù) 2023-11-10
?
動漫人物

TA貢獻(xiàn)1815條經(jīng)驗 獲得超10個贊

發(fā)布者無法在沒有任何連接的情況下緩沖消息,它只會刪除任何新消息:

來自文檔:

如果發(fā)布者沒有連接的訂閱者,那么它將簡單地刪除所有消息。

這意味著您的緩沖區(qū)需要不受 Zeromq 的關(guān)注。您的緩沖區(qū)可以是列表、數(shù)據(jù)庫或您選擇的任何其他存儲方法,但您不能使用發(fā)布者來執(zhí)行此操作。

現(xiàn)在下一個問題是處理如何檢測訂閱者已連接/斷開連接。這需要告訴我們何時需要開始從緩沖區(qū)讀取/填充緩沖區(qū)。

我建議使用Socket.monitor和監(jiān)聽 和ZMQ_EVENT_CONNECTED,ZMQ_EVENT_DISCONNECTED因為它們會告訴您客戶端何時連接/斷開連接,從而使您能夠切換到填充您選擇的緩沖區(qū)。當(dāng)然,可能還有其他不直接涉及 Zeromq 的方法,但這由您決定。


查看完整回答
反對 回復(fù) 2023-11-10
  • 3 回答
  • 0 關(guān)注
  • 217 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號