第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

使用此代碼(Paho MQTT)作為 GoRoutine 并通過(guò)通道傳遞消息以通過(guò)

使用此代碼(Paho MQTT)作為 GoRoutine 并通過(guò)通道傳遞消息以通過(guò)

Go
侃侃無(wú)極 2023-07-26 17:20:34
作為標(biāo)準(zhǔn)代碼,我用來(lái)發(fā)布消息以進(jìn)行測(cè)試:func main() {    opts := MQTT.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")    opts.SetClientID("myclientid_")    opts.SetDefaultPublishHandler(f)    opts.SetConnectionLostHandler(connLostHandler)    opts.OnConnect = func(c MQTT.Client) {        fmt.Printf("Client connected, subscribing to: test/topic\n")        if token := c.Subscribe("logs", 0, nil); token.Wait() && token.Error() != nil {            fmt.Println(token.Error())            os.Exit(1)        }    }    c := MQTT.NewClient(opts)    if token := c.Connect(); token.Wait() && token.Error() != nil {        panic(token.Error())    }    for i := 0; i < 5; i++ {        text := fmt.Sprintf("this is msg #%d!", i)        token := c.Publish("logs", 0, false, text)        token.Wait()    }    time.Sleep(3 * time.Second)    if token := c.Unsubscribe("logs"); token.Wait() && token.Error() != nil {        fmt.Println(token.Error())        os.Exit(1)    }    c.Disconnect(250)}這個(gè)效果很好!但是在執(zhí)行高延遲任務(wù)時(shí)大量傳遞消息,我的程序性能會(huì)很低,所以我必須使用 goroutine 和 Channel。這段代碼正是我想要的!但作為 Golang 中的菜鳥(niǎo),我不知道如何START()在主函數(shù)中運(yùn)行函數(shù)以及要傳遞什么參數(shù)!特別是,我將如何使用通道將消息傳遞給工作人員(發(fā)布者)?!我們將不勝感激您的幫助!
查看完整描述

2 回答

?
慕斯王

TA貢獻(xiàn)1864條經(jīng)驗(yàn) 獲得超2個(gè)贊

為什么不將消息發(fā)送給一群工作人員呢?


像這樣的東西:


...

    const workerPoolSize = 10 // the number of workers you want to have

    wg := &sync.WaitGroup{}

    wCh := make(chan string)

    wg.Add(workerPoolSize) // you want to wait for 10 workers to finish the job


    // run workers in goroutines

    for i := 0; i < workerPoolSize; i++ {

        go func(wch <-chan string) {

            // get the data from the channel

            for text := range wch {

                c.Publish("logs", 0, false, text)

                token.Wait()

            }

            wg.Done() // worker says that he finishes the job

        }(wCh)

    }


    for i := 0; i < 5; i++ {

        // put the data to the channel

        wCh <- fmt.Sprintf("this is msg #%d!", i)

    }


        close(wCh)

    wg.Wait() // wait for all workers to finish

...


查看完整回答
反對(duì) 回復(fù) 2023-07-26
?
慕姐8265434

TA貢獻(xiàn)1813條經(jīng)驗(yàn) 獲得超2個(gè)贊

當(dāng)您說(shuō)“在執(zhí)行高延遲任務(wù)時(shí)大量傳遞消息”時(shí),我假設(shè)您的意思是您想要異步發(fā)送消息(因此消息由與主代碼運(yùn)行不同的 go 例程處理)。

如果是這種情況,那么對(duì)您的初始示例進(jìn)行非常簡(jiǎn)單的更改將為您提供:

for i := 0; i < 5; i++ {

? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)

? ? ? ? token := c.Publish("logs", 0, false, text)

? ? ? ? // comment out... token.Wait()

? ? }

注意:您的示例代碼可能會(huì)在消息實(shí)際發(fā)送之前退出;添加 time.Sleep(10 * time.Second) 會(huì)給它時(shí)間讓它們熄滅;請(qǐng)參閱下面的代碼了解處理此問(wèn)題的另一種方法


您的初始代碼在消息發(fā)送之前停止的唯一原因是您調(diào)用了 token.Wait()。如果您不關(guān)心錯(cuò)誤(并且您不檢查錯(cuò)誤,所以我假設(shè)您不關(guān)心),那么調(diào)用 token.Wait() 就沒(méi)有什么意義(它只是等待消息發(fā)送;消息將消失無(wú)論你是否調(diào)用 token.Wait() )。


如果您想記錄任何錯(cuò)誤,您可以使用類似以下內(nèi)容:


for i := 0; i < 5; i++ {

? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)

? ? ? ? token := c.Publish("logs", 0, false, text)

? ? ? ? go func(){

? ? ? ? ? ? token.Wait()

? ? ? ? ? ? err := token.Error()

? ? ? ? ? ? if err != nil {

? ? ? ? ? ? ? ? fmt.Printf("Error: %s\n", err.Error()) // or whatever you want to do with your error

? ? ? ? ? ? }

? ? ? ? }()

? ? }

請(qǐng)注意,如果消息傳遞至關(guān)重要(但由于您沒(méi)有檢查錯(cuò)誤,我假設(shè)它不是),您還需要做一些其他事情。


就您找到的代碼而言;我懷疑這會(huì)增加您不需要的復(fù)雜性(并且需要更多信息才能解決此問(wèn)題;例如,MqttProtocol 結(jié)構(gòu)未在您粘貼的位中定義)。


額外的一點(diǎn)......在您的評(píng)論中您提到“發(fā)布的消息必須排序”。如果這是必要的(因此您想等到每條消息都已送達(dá)后再發(fā)送另一條消息),那么您需要類似以下內(nèi)容:


msgChan := make(chan string, 200) // Allow a queue of up to 200 messages

var wg sync.WaitGroup

wg.Add(1)

go func(){ // go routine to send messages from channel

? ? for msg := range msgChan {

? ? ? ? token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital

? ? ? ? token.Wait()

? ? ? ? // should check for errors here

? ? }

? ? wg.Done()

}()


for i := 0; i < 5; i++ {

? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)

? ? ? ? msgChan <- text

? ? }

close(msgChan) // this will stop the goroutine (when all messages processed)

wg.Wait() // Wait for all messages to be sent before exiting (may wait for ever is mqtt broker down!)

注意:這與 Ilya Kaznacheev 的解決方案類似(如果將workerPoolSize設(shè)置為1并使通道緩沖)


正如您的評(píng)論表明等待組使這一點(diǎn)難以理解,這里是另一種可能更清晰的等待方式(等待組通常在您等待多件事情完成時(shí)使用;在這個(gè)例子中,我們只等待一件事情,所以可以使用更簡(jiǎn)單的方法)


msgChan := make(chan string, 200) // Allow a queue of up to 200 messages

done := make(chan struct{}) // channel used to indicate when go routine has finnished


go func(){ // go routine to send messages from channel

? ? for msg := range msgChan {

? ? ? ? token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital

? ? ? ? token.Wait()

? ? ? ? // should check for errors here

? ? }

? ? close(done) // let main routine know we have finnished

}()


for i := 0; i < 5; i++ {

? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)

? ? ? ? msgChan <- text

? ? }

close(msgChan) // this will stop the goroutine (when all messages processed)

<-done // wait for publish go routine to complete


查看完整回答
反對(duì) 回復(fù) 2023-07-26
  • 2 回答
  • 0 關(guān)注
  • 186 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)