2 回答

TA貢獻(xiàn)1864條經(jīng)驗(yàn) 獲得超2個(gè)贊
為什么不將消息發(fā)送給一群工作人員呢?
像這樣的東西:
...
const workerPoolSize = 10 // the number of workers you want to have
wg := &sync.WaitGroup{}
wCh := make(chan string)
wg.Add(workerPoolSize) // you want to wait for 10 workers to finish the job
// run workers in goroutines
for i := 0; i < workerPoolSize; i++ {
go func(wch <-chan string) {
// get the data from the channel
for text := range wch {
c.Publish("logs", 0, false, text)
token.Wait()
}
wg.Done() // worker says that he finishes the job
}(wCh)
}
for i := 0; i < 5; i++ {
// put the data to the channel
wCh <- fmt.Sprintf("this is msg #%d!", i)
}
close(wCh)
wg.Wait() // wait for all workers to finish
...

TA貢獻(xiàn)1813條經(jīng)驗(yàn) 獲得超2個(gè)贊
當(dāng)您說(shuō)“在執(zhí)行高延遲任務(wù)時(shí)大量傳遞消息”時(shí),我假設(shè)您的意思是您想要異步發(fā)送消息(因此消息由與主代碼運(yùn)行不同的 go 例程處理)。
如果是這種情況,那么對(duì)您的初始示例進(jìn)行非常簡(jiǎn)單的更改將為您提供:
for i := 0; i < 5; i++ {
? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)
? ? ? ? token := c.Publish("logs", 0, false, text)
? ? ? ? // comment out... token.Wait()
? ? }
注意:您的示例代碼可能會(huì)在消息實(shí)際發(fā)送之前退出;添加 time.Sleep(10 * time.Second) 會(huì)給它時(shí)間讓它們熄滅;請(qǐng)參閱下面的代碼了解處理此問(wèn)題的另一種方法
您的初始代碼在消息發(fā)送之前停止的唯一原因是您調(diào)用了 token.Wait()。如果您不關(guān)心錯(cuò)誤(并且您不檢查錯(cuò)誤,所以我假設(shè)您不關(guān)心),那么調(diào)用 token.Wait() 就沒(méi)有什么意義(它只是等待消息發(fā)送;消息將消失無(wú)論你是否調(diào)用 token.Wait() )。
如果您想記錄任何錯(cuò)誤,您可以使用類似以下內(nèi)容:
for i := 0; i < 5; i++ {
? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)
? ? ? ? token := c.Publish("logs", 0, false, text)
? ? ? ? go func(){
? ? ? ? ? ? token.Wait()
? ? ? ? ? ? err := token.Error()
? ? ? ? ? ? if err != nil {
? ? ? ? ? ? ? ? fmt.Printf("Error: %s\n", err.Error()) // or whatever you want to do with your error
? ? ? ? ? ? }
? ? ? ? }()
? ? }
請(qǐng)注意,如果消息傳遞至關(guān)重要(但由于您沒(méi)有檢查錯(cuò)誤,我假設(shè)它不是),您還需要做一些其他事情。
就您找到的代碼而言;我懷疑這會(huì)增加您不需要的復(fù)雜性(并且需要更多信息才能解決此問(wèn)題;例如,MqttProtocol 結(jié)構(gòu)未在您粘貼的位中定義)。
額外的一點(diǎn)......在您的評(píng)論中您提到“發(fā)布的消息必須排序”。如果這是必要的(因此您想等到每條消息都已送達(dá)后再發(fā)送另一條消息),那么您需要類似以下內(nèi)容:
msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
var wg sync.WaitGroup
wg.Add(1)
go func(){ // go routine to send messages from channel
? ? for msg := range msgChan {
? ? ? ? token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
? ? ? ? token.Wait()
? ? ? ? // should check for errors here
? ? }
? ? wg.Done()
}()
for i := 0; i < 5; i++ {
? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)
? ? ? ? msgChan <- text
? ? }
close(msgChan) // this will stop the goroutine (when all messages processed)
wg.Wait() // Wait for all messages to be sent before exiting (may wait for ever is mqtt broker down!)
注意:這與 Ilya Kaznacheev 的解決方案類似(如果將workerPoolSize設(shè)置為1并使通道緩沖)
正如您的評(píng)論表明等待組使這一點(diǎn)難以理解,這里是另一種可能更清晰的等待方式(等待組通常在您等待多件事情完成時(shí)使用;在這個(gè)例子中,我們只等待一件事情,所以可以使用更簡(jiǎn)單的方法)
msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
done := make(chan struct{}) // channel used to indicate when go routine has finnished
go func(){ // go routine to send messages from channel
? ? for msg := range msgChan {
? ? ? ? token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
? ? ? ? token.Wait()
? ? ? ? // should check for errors here
? ? }
? ? close(done) // let main routine know we have finnished
}()
for i := 0; i < 5; i++ {
? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)
? ? ? ? msgChan <- text
? ? }
close(msgChan) // this will stop the goroutine (when all messages processed)
<-done // wait for publish go routine to complete
- 2 回答
- 0 關(guān)注
- 186 瀏覽
添加回答
舉報(bào)