3 回答

TA貢獻(xiàn)1998條經(jīng)驗(yàn) 獲得超6個(gè)贊
sync.WaitGroup
這是一個(gè)簡單的計(jì)數(shù)器,其Wait
方法將阻塞直到計(jì)數(shù)器值達(dá)到零。它旨在允許您在允許主要執(zhí)行流程繼續(xù)進(jìn)行之前阻止(或加入)多個(gè) goroutine。
的界面WaitGroup
對于您的用例而言沒有足夠的表現(xiàn)力,也不是設(shè)計(jì)為。特別是,您不能通過簡單地調(diào)用wg.Add(y)
(where y < x) 來天真地使用它。wg.Done
第 (y+1)個(gè)goroutine調(diào)用將導(dǎo)致 panic,因?yàn)榈却M具有負(fù)內(nèi)部值是錯(cuò)誤的。此外,我們不能通過觀察 ; 的內(nèi)部計(jì)數(shù)器值來“聰明”?WaitGroup
。這會破壞抽象,無論如何,它的內(nèi)部狀態(tài)不會被導(dǎo)出。
實(shí)現(xiàn)你自己的!
您可以根據(jù)下面的代碼使用一些渠道自己實(shí)現(xiàn)相關(guān)邏輯。從控制臺觀察,啟動了 10 個(gè) goroutine,但在兩個(gè)完成后,我們 fallthrough 繼續(xù)在 main 方法中執(zhí)行。
package main
import (
? ? "fmt"
? ? "time"
)
// Set goroutine counts here
const (
? ? // The number of goroutines to spawn
? ? x = 10
? ? // The number of goroutines to wait for completion
? ? // (y <= x) must hold.
? ? y = 2
)
func doSomeWork() {
? ? // do something meaningful
? ? time.Sleep(time.Second)
}
func main() {
? ? // Accumulator channel, used by each goroutine to signal completion.
? ? // It is buffered to ensure the [y+1, ..., x) goroutines do not block
? ? // when sending to the channel, which would cause a leak. It will be
? ? // garbage collected when all goroutines end and the channel falls
? ? // out of scope. We receive y values, so only need capacity to receive
? ? // (x-y) remaining values.
? ? accChan := make(chan struct{}, x-y)
? ? // Spawn "x" goroutines
? ? for i := 0; i < x; i += 1 {
? ? ? ? // Wrap our work function with the local signalling logic
? ? ? ? go func(id int, doneChan chan<- struct{}) {
? ? ? ? ? ? fmt.Printf("starting goroutine #%d\n", id)
? ? ? ? ? ? doSomeWork()
? ? ? ? ? ? fmt.Printf("goroutine #%d completed\n", id)
? ? ? ? ? ? // Communicate completion of goroutine
? ? ? ? ? ? doneChan <- struct{}{}
? ? ? ? }(i, accChan)
? ? }
? ? for doneCount := 0; doneCount < y; doneCount += 1 {
? ? ? ? <-accChan
? ? }
? ? // Continue working
? ? fmt.Println("Carrying on without waiting for more goroutines")
}
避免泄漏資源
由于這不會等待 [y+1, ..., x) goroutines 完成,因此您應(yīng)該特別注意函數(shù)doSomeWork以消除或最小化工作可能無限期阻塞的風(fēng)險(xiǎn),這也會導(dǎo)致泄漏。在可能的情況下,消除無限期阻塞 I/O(包括通道操作)或陷入無限循環(huán)的可能性。
context
當(dāng)不再需要它們的結(jié)果來中斷執(zhí)行時(shí),您可以使用 a向其他 goroutine 發(fā)出信號。

TA貢獻(xiàn)1836條經(jīng)驗(yàn) 獲得超4個(gè)贊
WaitGroup
實(shí)際上并不等待 goroutines,它一直等到其內(nèi)部計(jì)數(shù)器達(dá)到零。如果你只有Add()
你關(guān)心的 goroutines 的數(shù)量,并且你只調(diào)用Done()
你關(guān)心的那些 goroutines,那么Wait()
只會阻塞直到你關(guān)心的那些 goroutines 完成。您可以完全控制邏輯和流程,對WaitGroup
“允許”的內(nèi)容沒有任何限制。

TA貢獻(xiàn)1815條經(jīng)驗(yàn) 獲得超6個(gè)贊
這些是您要跟蹤的特定 y 例程,還是 x 中的任何 y?標(biāo)準(zhǔn)是什么?
更新:
1. 如果您可以控制任何標(biāo)準(zhǔn)來選擇matching ygo-routines:
wp.wg.Add(1)如果無法wp.wg.Done()在 goroutine 外部檢查您的條件,則可以通過將其作為指針參數(shù)傳遞給 goroutine 來根據(jù)您的條件從 goroutine 內(nèi)部執(zhí)行和操作。
類似于下面的示例代碼。如果您提供有關(guān)您正在嘗試做的事情的更多詳細(xì)信息,將能夠更具體。
func sampleGoroutine(z int, b string, wg *sync.WaitGroup){
? ? defer func(){
? ? ? ? if contition1{
? ? ? ? ? ? wg.Done()
? ? ? ? }
? ? }
? ? if contition1 {
? ? ? ? wg.Add(1)
? ? ? ? //do stuff
? ? }
}
func main() {
? ? wg := sync.WaitGroup{}
? ? for i := 0; i < x; i++ {
? ? ? ? go sampleGoroutine(1, "one", &wg)
? ? }
? ? wg.Wait()
}
2. 如果你無法控制哪些,只想要first y:
根據(jù)您的評論,您無法控制/希望選擇任何特定的 goroutine,而是選擇最先完成的 goroutine。如果您想以通用方式執(zhí)行此操作,則可以使用適合您的用例的以下自定義 waitGroup 實(shí)現(xiàn)。(不過,它不是復(fù)制安全的。也沒有/需要 wg.Add(int) 方法)
type CountedWait struct {
? ? wait? chan struct{}
? ? limit int
}
func NewCountedWait(limit int) *CountedWait {
? ? return &CountedWait{
? ? ? ? wait:? make(chan struct{}, limit),
? ? ? ? limit: limit,
? ? }
}
func (cwg *CountedWait) Done() {
? ? cwg.wait <- struct{}{}
}
func (cwg *CountedWait) Wait() {
? ? count := 0
? ? for count < cwg.limit {
? ? ? ? <-cwg.wait
? ? ? ? count += 1
? ? }
}
可以按如下方式使用:
func sampleGoroutine(z int, b string, wg *CountedWait) {
? ? success := false
? ? defer func() {
? ? ? ? if success == true {
? ? ? ? ? ? fmt.Printf("goroutine %d finished successfully\n", z)
? ? ? ? ? ? wg.Done()
? ? ? ? }
? ? }()
? ? fmt.Printf("goroutine %d started\n", z)
? ? time.Sleep(time.Second)
? ? if rand.Intn(10)%2 == 0 {
? ? ? ? success = true
? ? }
}
func main() {
? ? x := 10
? ? y := 3
? ? wg := NewCountedWait(y)
? ? for i := 0; i < x; i += 1 {
? ? ? ? // Wrap our work function with the local signalling logic
? ? ? ? go sampleGoroutine(i, "something", wg)
? ? }
? ? wg.Wait()
? ? fmt.Printf("%d out of %d goroutines finished successfully.\n", y, x)
}
3. 你也可以加入context2 以確保剩余的 goroutines 不會泄漏 你可能無法在 play.golang 上運(yùn)行它,因?yàn)樗幸恍╅L時(shí)間的休眠。
下面是一個(gè)示例輸出:(注意,可能有超過 y=3 個(gè) goroutines 標(biāo)記完成,但你只等到 3 個(gè)完成)
goroutine 9 started
goroutine 0 started
goroutine 1 started
goroutine 2 started
goroutine 3 started
goroutine 4 started
goroutine 5 started
goroutine 5 marking done
goroutine 6 started
goroutine 7 started
goroutine 7 marking done
goroutine 8 started
goroutine 3 marking done
continuing after 3 out of 10 goroutines finished successfully.
goroutine 9 will be killed, bcz cancel
goroutine 8 will be killed, bcz cancel
goroutine 6 will be killed, bcz cancel
goroutine 1 will be killed, bcz cancel
goroutine 0 will be killed, bcz cancel
goroutine 4 will be killed, bcz cancel
goroutine 2 will be killed, bcz cancel
- 3 回答
- 0 關(guān)注
- 171 瀏覽
添加回答
舉報(bào)