第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在 Go 中執(zhí)行并發(fā)下載

如何在 Go 中執(zhí)行并發(fā)下載

Go
慕哥6287543 2021-11-08 10:22:08
我們有一個流程,用戶可以通過該流程請求我們需要從源頭獲取的文件。此來源不是最可靠的,因此我們使用 Amazon SQS 實施了一個隊列。我們將下載 URL 放入隊列中,然后使用我們用 Go 編寫的一個小應用程序對其進行輪詢。這個應用程序只是檢索消息,下載文件,然后將其推送到我們存儲它的 S3。一旦所有這些都完成,它會回調一個服務,該服務將通過電子郵件通知用戶,讓他們知道文件已準備就緒。最初我寫這個是為了創(chuàng)建n 個通道,然后將 1 個 go-routine 附加到每個通道,并使 go-routine 處于無限循環(huán)中。這樣我就可以確保我一次只處理固定數(shù)量的下載。我意識到這不是應該使用通道的方式,如果我現(xiàn)在理解正確的話,實際上應該有一個帶有n 個go-routines 的通道在該通道上接收。每個 go-routine 都處于無限循環(huán)中,等待一條消息,當它收到消息時,它將處理數(shù)據(jù),做它應該做的一切,當它完成時,它將等待下一條消息。這讓我可以確保我一次只處理n 個文件。我認為這是正確的做法。我相信這是扇出,對吧?我并不需要做的,是要合并這些進程重新走到一起。下載完成后,它會回調遠程服務,以便處理剩余的過程。該應用程序無需執(zhí)行任何其他操作。好的,所以一些代碼:func main() {    queue, err := ConnectToQueue() // This works fine...    if err != nil {        log.Fatalf("Could not connect to queue: %s\n", err)    }    msgChannel := make(chan sqs.Message, 10)    for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {        go processMessage(msgChannel, queue)    }    for {        response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)        for _, m := range response.Messages {            msgChannel <- m        }    }}func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {    for {        m := <-ch        // Do something with message m        // Delete message from queue when we're done        queue.DeleteMessage(&m)    }}我在這附近的任何地方嗎?我有n 個正在運行的 go-routines(其中MAX_CONCURRENT_ROUTINES= n)并且在循環(huán)中我們將繼續(xù)將消息傳遞到單個通道。這是正確的方法嗎?我需要關閉任何東西還是我可以無限期地運行它?我注意到的一件事是 SQS 正在返回消息,但是一旦我將 10 條消息傳入processMessage()(10 條是通道緩沖區(qū)的大?。?,實際上沒有進一步處理消息。
查看完整描述

1 回答

?
汪汪一只貓

TA貢獻1898條經(jīng)驗 獲得超8個贊

那看起來不錯。一些注意事項:


您可以通過限制您生成的工作程序例程數(shù)量以外的方式來限制工作并行度。例如,您可以為收到的每條消息創(chuàng)建一個 goroutine,然后讓生成的 goroutine 等待限制并行度的信號量。當然有權衡取舍,但您不僅限于您所描述的方式。


sem := make(chan struct{}, n)

work := func(m sqs.Message) {

    sem <- struct{}{} // When there's room we can proceed

    // do the work

    <-sem // Free room in the channel

}()

for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) {

    for _, m0 := range m {

        go work(m0)

    }

}

僅處理 10 條消息的限制是由堆棧中的其他地方造成的。可能您正在看到前 10 個填充通道的競賽,然后工作沒有完成,或者您可能不小心從工作程序例程中返回。如果您的員工按照您所描述的模式堅持不懈,您將希望確定他們不會回來。


不清楚您是否希望在處理了一定數(shù)量的消息后返回該進程。如果您確實希望此進程退出,則需要等待所有工作人員完成其當前任務,并可能在之后通知他們返回。看看sync.WaitGroup同步他們的完成,并有另一個通道來表示沒有更多的工作,或者 close msgChannel,并在你的工作人員中處理。(看看二元組返回通道接收表達式。)


查看完整回答
反對 回復 2021-11-08
  • 1 回答
  • 0 關注
  • 178 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網(wǎng)微信公眾號