我有一個(gè) Go 應(yīng)用程序處理來自單個(gè) RabbitMQ 隊(duì)列的事件。我使用github.com/streadway/amqpRabbitMQ 客戶端庫。Go 應(yīng)用程序在約 2-3 秒內(nèi)處理每條消息。如果我從內(nèi)存中輸入消息,則可以并行處理大約 1000 條甚至更多消息。但不幸的是,RabbitMQ 的性能更差。所以,我想更快地消耗隊(duì)列中的消息。所以,問題是:如何使用最有效的方式消費(fèi)消息github.com/streadway/amqp?據(jù)我了解,有兩種方法:設(shè)置高預(yù)取https://godoc.org/github.com/streadway/amqp#Channel.Qos.使用單一消費(fèi)者 goroutine示例代碼:conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()ch.Qos( 10000, // prefetch count 0, // prefetch size false, // global )msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // NO auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args)for d := range msgs { log.Printf("Received a message: %s", d.Body) err:= processMessage(d) if err != nil { log.Printf("%s : while consuming task", err) d.Nack(false, true) } else { d.Ack(false) } continue // consume other messages}但是processMessage這里會(huì)并行調(diào)用嗎?但這是 RAM 友好的方法嗎?對(duì)于 RabbitMQ 來說,為每個(gè)工作進(jìn)程生成一個(gè)新通道不是很戲劇性嗎?那么,問題是,哪種變體更好?更好的性能、更好的內(nèi)存使用等。那么,這里 RabbitMQ 的最佳用法是什么?更新:目前,我遇到了一個(gè)情況,我的工作人員消耗了 VPS 上的所有 RAM,并且被 OOM 殺死。我使用了第二種方法。因此,就我而言,更好的是能夠讓我的工作人員在工作幾分鐘后不會(huì)被 OOM 殺死。更新2:nack當(dāng)worker無法處理消息時(shí),以及ack當(dāng)worker處理消息時(shí)非常重要。所有消息都必須被處理(其客戶分析),但有時(shí)工作人員無法處理它,因此它必須通過nack消息將其傳遞給其他工作人員(目前,一些用于處理消息的第 3 方 api 有時(shí)只是返回 503 狀態(tài)代碼,在此案例消息應(yīng)傳遞給其他工作人員或重試)。所以,auto-ack不幸的是,使用不是一個(gè)選擇。
1 回答

縹緲止盈
TA貢獻(xiàn)2041條經(jīng)驗(yàn) 獲得超4個(gè)贊
我想每次都processMessage()
在一個(gè)新的 goroutine 中運(yùn)行。
哪個(gè)變體更好?
我更喜歡第一個(gè),因?yàn)榇蜷_/關(guān)閉通道有點(diǎn)昂貴(2 + 2 TCP 數(shù)據(jù)包)。我認(rèn)為你的 OOM 問題與太多 gorutine 無關(guān),gorutine 很輕,只需要 5KB 左右。所以問題很可能是由你的processMessage()
.
我認(rèn)為github.com/streadway/amqp
通道消費(fèi)操作是線程/goroutine安全的,因此如果你只做一些消費(fèi)操作,那么在goruntine之間共享通道是安全的。
- 1 回答
- 0 關(guān)注
- 197 瀏覽
添加回答
舉報(bào)
0/150
提交
取消