3 回答

TA貢獻(xiàn)1871條經(jīng)驗 獲得超8個贊
自從其他兩個答案(盡管信息非常豐富,實際上是錯誤的)以來,我發(fā)布了一個快速更新,并且我不希望其他人從我接受的答案中得到誤導(dǎo)。您不僅可以使用 zmq 做到這一點,它實際上是默認(rèn)行為。
訣竅是,如果您的發(fā)布者客戶端在不斷丟棄消息之前從未連接到訂閱者服務(wù)器(這就是為什么我認(rèn)為它不緩沖消息),但是如果您的發(fā)布者連接到訂閱者并且您重新啟動訂閱者,則發(fā)布者將緩沖消息直到達(dá)到 HWM,這正是我所要求的...所以簡而言之,發(fā)布者想知道另一端是否有人接受消息,只有在此之后它才會緩沖消息...
這是一些示例代碼,演示了這一點(您可能需要進(jìn)行一些基本編輯才能編譯它)。
我只使用了這個依賴項org.zeromq:jeromq:0.5.1。
zmq-publisher.kt
fun main() {
val uri = "tcp://localhost:3006"
val context = ZContext(1)
val socket = context.createSocket(SocketType.PUB)
socket.hwm = 10000
socket.linger = 0
"connecting to $uri".log()
socket.connect(uri)
fun publish(path: String, msg: Msg) {
">> $path | ${msg.json()}".log()
socket.sendMore(path)
socket.send(msg.toByteArray())
}
var count = 0
while (notInterrupted()) {
val msg = telegramMessage("message : ${++count}")
publish("/some/feed", msg)
println()
sleepInterruptible(1.second)
}
}
而且當(dāng)然zmq-subscriber.kt
fun main() {
val uri = "tcp://localhost:3006"
val context = ZContext(1)
val socket = context.createSocket(SocketType.SUB)
socket.hwm = 10000
socket.receiveTimeOut = 250
"connecting to $uri".log()
socket.bind(uri)
socket.subscribe("/some/feed")
while (true) {
val path = socket.recvStr() ?: continue
val bytes = socket.recv()
val msg = Msg.parseFrom(bytes)
"<< $path | ${msg.json()}".log()
}
}
嘗試先在沒有訂閱者的情況下運行發(fā)布者,然后當(dāng)您啟動訂閱者時,您錯過了到目前為止的所有消息...現(xiàn)在不重新啟動發(fā)布者,停止訂閱者等待一段時間并再次啟動它。
這是我的一項服務(wù)實際上從中受益的示例......這是結(jié)構(gòu)[current service]sub:server <= pub:client[service being restarted]sub:server <=* pub:client[multiple publishers]
因為我在中間重新啟動服務(wù),所有發(fā)布者開始緩沖他們的消息,每秒觀察約 200 條消息的最終服務(wù)觀察到下降到 0(那些 1 或 2 是心跳),然后突然爆發(fā) 1000 多條消息,因為所有發(fā)布者都刷新了他們的緩沖區(qū)(重新啟動大約需要 5 秒)...實際上我在這里沒有丟失任何消息...
請注意,您必須有subscriber:server <= publisher:client
一對(這樣發(fā)布者就知道“我需要將這些消息傳遞到一個地方”(您可以嘗試綁定發(fā)布者并連接訂閱者,但您將不會再看到發(fā)布者緩沖消息,因為它是有問題的,如果剛剛斷開連接的訂閱者這樣做是因為它不再需要數(shù)據(jù)或因為它失?。?/p>

TA貢獻(xiàn)1830條經(jīng)驗 獲得超9個贊
首先,歡迎來到 Zen-of-Zero 的世界,其中延遲最為重要
序言:
ZeroMQ 是由 Pieter HINTJENS 的團(tuán)隊設(shè)計的,該團(tuán)隊由經(jīng)驗豐富的大師組成,Martin SUSTRIK 名列第一。該設(shè)計經(jīng)過專業(yè)設(shè)計,以避免任何不必要的延遲。那么詢問是否有(有限的)持久性?不,先生,尚未確認(rèn) -PUB/SUB
可擴展的正式通信模式原型不會內(nèi)置它,因為增加了問題并降低了性能和可擴展性(附加延遲、附加處理、附加內(nèi)存管理)。
如果需要(有限的)持久性(對于缺少遠(yuǎn)程 SUB 端代理的連接),請隨意在應(yīng)用程序端實現(xiàn)它,或者可以設(shè)計和實現(xiàn)一種新的 ZMTP 兼容的此類行為模式原型,擴展 ZeroMQ 框架,如果此類工作進(jìn)入穩(wěn)定且公開接受的狀態(tài),但不要求高性能、消除延遲的標(biāo)準(zhǔn)已經(jīng)打磨了PUB/SUB
幾乎線性的可擴展性 ad astra,以朝這個方向進(jìn)行修改。這絕對不是一條路可走。
解決方案 ?
應(yīng)用程序端可以使用雙指針循環(huán)緩沖區(qū)輕松實現(xiàn)您添加的邏輯,以某種(應(yīng)用程序端管理)的方式工作-?Persistence-PROXY,但位于PUB
發(fā)送方的前面。
如果您的設(shè)計還喜歡使用最近提供的內(nèi)置 ZeroMQ 組件來設(shè)置額外socket_monitor
的控制層并在那里接收事件流(如圖所示),那么您的設(shè)計可能會成功地從 ZeroMQ 內(nèi)部細(xì)節(jié)中擠出一些額外的醬料從 PUB 端Context
實例“內(nèi)部”,其中一些額外的網(wǎng)絡(luò)和連接管理相關(guān)事件可能會給您的(應(yīng)用程序端管理)帶來更多信息-持久性代理
然而,請注意
該
_zmq_socket_monitor()_
方法僅支持面向連接的傳輸,即TCP、IPC和TIPC。
因此,如果計劃使用任何最終有趣的傳輸類,人們可能會直接忘記這一點{ inproc:// | norm:// | pgm:// | epgm:// | vmci:// }
小心 !
“...zmq 優(yōu)化主題發(fā)布?就像如果您持續(xù)
topic
快速發(fā)布大約 100 個字符長的內(nèi)容,它實際上是topic
每次都發(fā)送還是映射到某個 int 并隨后發(fā)送 int...?”
告訴你:
“它總是會發(fā)布
topic.
當(dāng)我使用該pub-sub
模式時,我通常發(fā)布topic
第一個消息,然后發(fā)布實際消息,因此在訂閱者中,我只讀取第一幀并忽略它,然后讀取實際消息”
ZeroMQ 不是這樣工作的。沒有什么是“單獨”<topic>
后面跟著一個<message-body>
,而是相反
主題過濾TOPIC
的機械化工作方式非常不同。
1)你永遠(yuǎn)不知道,誰.connect()
-s:
即幾乎可以肯定版本 2.x 直到版本 4.2+ 將以不同的方式處理主題過濾(ZMTP:RFC 定義初始功能版本握手,以讓Context
-instance決定必須使用哪個版本的主題過濾:
版本 2.x用于將所有消息移動到所有對等方,并讓所有SUB 端(版本 2.x+)傳遞消息(并讓-?SUB
side?Context
-instance 處理本地topic
-list 過濾處理)
,而
4.2+ 版本肯定會topic
在 **PUB-side?Context
-instance 上執(zhí)行 -list 過濾處理(CPU 使用量增長,網(wǎng)絡(luò)傳輸相反),所以你的 SUB-一方永遠(yuǎn)不會傳遞一個字節(jié)的“無用”讀取“未訂閱”的消息。
2)(您可以,但是)不需要將“主題”分離到由此暗示的多幀消息的第一幀中。也許恰恰相反(在高性能、低延遲的分布式系統(tǒng)設(shè)計中這樣做是一種相當(dāng)反模式。
主題過濾過程被定義并按字節(jié)工作,從左到右,每個主題列表成員值與傳遞的消息有效負(fù)載進(jìn)行模式匹配。
添加額外的數(shù)據(jù)、額外的幀管理處理只會增加端到端延遲和處理開銷。這樣做從來都不是一個好主意,而不是正確的分布式系統(tǒng)設(shè)計工作。
尾聲:
職業(yè)比賽中沒有輕而易舉的勝利,也沒有唾手可得的成果分布式系統(tǒng)設(shè)計,如果越少低延遲或超低延遲是設(shè)計目標(biāo)。
另一方面,請確保 ZeroMQ 框架是在考慮到這一點的情況下制定的,并且這些努力以穩(wěn)定的、最終性能良好平衡的工具集為頂峰,這些工具集智能(設(shè)計)、快速(運行)和可擴展(因為地獄可能會這樣)由于這種設(shè)計智慧,人們喜歡使用正確的信號/消息服務(wù)。
希望您對 ZeroMQ 感到滿意,并隨意在您選擇的應(yīng)用程序套件內(nèi)的 ZeroMQ 層“前面”添加任何附加功能集。

TA貢獻(xiàn)1815條經(jīng)驗 獲得超10個贊
發(fā)布者無法在沒有任何連接的情況下緩沖消息,它只會刪除任何新消息:
來自文檔:
如果發(fā)布者沒有連接的訂閱者,那么它將簡單地刪除所有消息。
這意味著您的緩沖區(qū)需要不受 Zeromq 的關(guān)注。您的緩沖區(qū)可以是列表、數(shù)據(jù)庫或您選擇的任何其他存儲方法,但您不能使用發(fā)布者來執(zhí)行此操作。
現(xiàn)在下一個問題是處理如何檢測訂閱者已連接/斷開連接。這需要告訴我們何時需要開始從緩沖區(qū)讀取/填充緩沖區(qū)。
我建議使用Socket.monitor
和監(jiān)聽 和ZMQ_EVENT_CONNECTED
,ZMQ_EVENT_DISCONNECTED
因為它們會告訴您客戶端何時連接/斷開連接,從而使您能夠切換到填充您選擇的緩沖區(qū)。當(dāng)然,可能還有其他不直接涉及 Zeromq 的方法,但這由您決定。
添加回答
舉報