4 回答

TA貢獻(xiàn)1796條經(jīng)驗(yàn) 獲得超10個(gè)贊
是的,WaitGroup
是正確的答案。根據(jù)doc?,您可以隨時(shí)使用WaitGroup.Add
計(jì)數(shù)器大于零。
請(qǐng)注意,當(dāng)計(jì)數(shù)器為零時(shí)發(fā)生的具有正增量的調(diào)用必須發(fā)生在等待之前。具有負(fù)增量的調(diào)用或在計(jì)數(shù)器大于零時(shí)開始的具有正增量的調(diào)用可能隨時(shí)發(fā)生。通常這意味著對(duì) Add 的調(diào)用應(yīng)該在創(chuàng)建 goroutine 或其他要等待的事件的語句之前執(zhí)行。如果重復(fù)使用 WaitGroup 來等待多個(gè)獨(dú)立的事件集,則必須在所有先前的 Wait 調(diào)用返回后發(fā)生新的 Add 調(diào)用。請(qǐng)參閱 WaitGroup 示例。
Close
但是一個(gè)技巧是,在調(diào)用之前,您應(yīng)該始終保持計(jì)數(shù)器大于零。這通常意味著您應(yīng)該調(diào)用wg.Add
in?NewFoo
(或類似的東西)并wg.Done
in?Close
.?并且為了防止多次調(diào)用Done
破壞等待組,你應(yīng)該包裝Close
成sync.Once
.?您可能還想防止Bar()
調(diào)用 new。

TA貢獻(xiàn)1811條經(jīng)驗(yàn) 獲得超6個(gè)贊
我認(rèn)為無限期地等待所有 go 例程完成不是正確的方法。如果其中一個(gè) go routines 被阻塞或說它由于某種原因掛起并且從未成功終止,應(yīng)該發(fā)生什么情況 kill 進(jìn)程或等待 go routines 完成?
相反,無論所有例程是否已完成,您都應(yīng)該等待一段時(shí)間并終止應(yīng)用程序。
上下文包可用于向所有 go 例程發(fā)送信號(hào)以處理 kill 信號(hào)。
appCtx, cancel := context.WithCancel(context.Background())
這里 appCtx 必須傳遞給所有的 go 例程。
在退出信號(hào)調(diào)用cancel()
。
作為 go 例程運(yùn)行的函數(shù)可以處理如何處理取消上下文。

TA貢獻(xiàn)1853條經(jīng)驗(yàn) 獲得超18個(gè)贊
WaitGroup是一種方式,但是,Go 團(tuán)隊(duì)errgroup完全針對(duì)您的用例引入了。leaf bebop 的回答中最不方便的部分是忽視錯(cuò)誤處理。錯(cuò)誤處理是存在的原因errgroup。慣用的 go 代碼不應(yīng)該吞下錯(cuò)誤。
但是,保留結(jié)構(gòu)的簽名Foo(裝飾性的除外workerNumber)——并且沒有錯(cuò)誤處理——我的建議如下所示:
package main
import (
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
)
type Foo struct {
errg errgroup.Group
}
func NewFoo() *Foo {
foo := &Foo{
errg: errgroup.Group{},
}
return foo
}
func (a *Foo) Bar(workerNumber int) {
a.errg.Go(func() error {
select {
// simulates the long running clals
case <-time.After(time.Second * time.Duration(rand.Intn(10))):
fmt.Println(fmt.Sprintf("worker %d completed its work", workerNumber))
return nil
}
})
}
func (a *Foo) Close() {
a.errg.Wait()
}
func main() {
foo := NewFoo()
for i := 0; i < 10; i++ {
foo.Bar(i)
}
<-time.After(time.Second * 5)
fmt.Println("Waiting for workers to complete...")
foo.Close()
fmt.Println("Done.")
}
這里的好處是,如果你在你的代碼中引入錯(cuò)誤處理(你應(yīng)該),你只需要稍微修改這段代碼:簡(jiǎn)而言之,將返回errg.Wait()第一個(gè) redis 錯(cuò)誤,并且Close()可以通過堆棧向上傳播它(到 main,在這種情況下)。
也可以使用該context.Context包,如果調(diào)用失敗,您還可以立即取消任何正在運(yùn)行的 redis 調(diào)用。文檔中有這方面的示例errgroup。

TA貢獻(xiàn)2039條經(jīng)驗(yàn) 獲得超8個(gè)贊
我經(jīng)常使用的模式是:https ://play.golang.org/p/ibMz36TS62z
package main
import (
"fmt"
"sync"
"time"
)
type response struct {
message string
}
func task(i int, done chan response) {
time.Sleep(1 * time.Second)
done <- response{fmt.Sprintf("%d done", i)}
}
func main() {
responses := GetResponses(10)
fmt.Println("all done", len(responses))
}
func GetResponses(n int) []response {
donequeue := make(chan response)
wg := sync.WaitGroup{}
for i := 0; i < n; i++ {
wg.Add(1)
go func(value int) {
defer wg.Done()
task(value, donequeue)
}(i)
}
go func() {
wg.Wait()
close(donequeue)
}()
responses := []response{}
for result := range donequeue {
responses = append(responses, result)
}
return responses
}
這也使得節(jié)流變得容易:https ://play.golang.org/p/a4MKwJKj634
package main
import (
"fmt"
"sync"
"time"
)
type response struct {
message string
}
func task(i int, done chan response) {
time.Sleep(1 * time.Second)
done <- response{fmt.Sprintf("%d done", i)}
}
func main() {
responses := GetResponses(10, 2)
fmt.Println("all done", len(responses))
}
func GetResponses(n, concurrent int) []response {
throttle := make(chan int, concurrent)
for i := 0; i < concurrent; i++ {
throttle <- i
}
donequeue := make(chan response)
wg := sync.WaitGroup{}
for i := 0; i < n; i++ {
wg.Add(1)
<-throttle
go func(value int) {
defer wg.Done()
throttle <- 1
task(value, donequeue)
}(i)
}
go func() {
wg.Wait()
close(donequeue)
}()
responses := []response{}
for result := range donequeue {
responses = append(responses, result)
}
return responses
}
- 4 回答
- 0 關(guān)注
- 220 瀏覽
添加回答
舉報(bào)