2 回答

TA貢獻(xiàn)1934條經(jīng)驗(yàn) 獲得超2個贊
目前尚不清楚為什么每個工作需要一名工作人員,但如果您這樣做,您可以重組您的外循環(huán)設(shè)置(請參閱下面未經(jīng)測試的代碼)。這首先就消除了對工作池的需要。
不過,在解雇任何員工之前,請務(wù)必wg.Add 先執(zhí)行此操作。在這里,您正好剝離了 100 名員工:
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
因此,您可以這樣做:
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
wg.Add(100) // ADDED - count the 100 workers
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
請注意,您現(xiàn)在可以將wg其自身移至派生出工作線程的 goroutine 中。如果你放棄讓每個工人將工作分拆為新的 goroutine 的想法,這可以讓事情變得更干凈。但是,如果每個工作人員要派生另一個 goroutine,則該工作人員本身也必須使用wg.Add,如下所示:
for j := range jobs {
wg.Add(1) // ADDED - count the spun-off goroutines
func(j Job) {
res := doSomethingWith(j)
out <- res
wg.Done() // MOVED (for illustration only, can defer as before)
}(j)
}
wg.Done() // ADDED - our work in `p.work` is now done
wg.Add(1)也就是說,每個匿名函數(shù)都是通道的另一個用戶,因此在分拆新的 goroutine 之前增加通道用戶計數(shù) ( )。當(dāng)您讀完輸入通道后jobs,調(diào)用wg.Done()(可能通過較早的defer,但我在此處的末尾展示了它)。
思考這個問題的關(guān)鍵是計算此時可以wg寫入通道的活動 goroutine 的數(shù)量。只有當(dāng)沒有g(shù)oroutine 打算再寫入時,它才會變?yōu)榱恪?這使得關(guān)閉通道是安全的。
考慮使用相當(dāng)簡單的(但未經(jīng)測試):
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
out = make(chan interface{})
var wg sync.WaitGroup
go func() {
defer close(out)
for j := range in {
wg.Add(1)
go func(j Job) {
res := doSomethingWith(j)
out <- res
wg.Done()
}(j)
}
wg.Wait()
}()
return out
}
現(xiàn)在,您有一個 goroutine 正在in以最快的速度讀取通道,并在讀取過程中分拆作業(yè)。每項傳入的工作都會獲得一個 goroutine,除非他們提前完成工作。沒有池,每個工作只有一名工人(與您的代碼相同,只是我們淘汰了沒有做任何有用事情的池)。
或者,由于只有一定數(shù)量的 CPU 可用,請像之前在開始時所做的那樣分拆一定數(shù)量的 goroutine,但讓每個 goroutine 運(yùn)行一個作業(yè)直至完成,并交付其結(jié)果,然后返回讀取下一個作業(yè):
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
out = make(chan interface{})
go func() {
defer close(out)
var wg sync.WaitGroup
ncpu := runtime.NumCPU() // or something fancier if you like
wg.Add(ncpu)
for i := 0; i < ncpu; i++ {
go func() {
defer wg.Done()
for j := range in {
out <- doSomethingWith(j)
}
}()
}
wg.Wait()
}
return out
}
通過使用,runtime.NumCPU()我們只能獲得與運(yùn)行作業(yè)的 CPU 一樣多的工作線程來讀取作業(yè)。這些是池,它們一次只做一項工作。
如果輸出通道讀取器結(jié)構(gòu)良好(即不會導(dǎo)致管道阻塞),通常不需要緩沖輸出通道。如果不是,這里的緩沖深度會限制您可以“領(lǐng)先于”正在使用結(jié)果的人的工作數(shù)量。根據(jù)“提前工作”的有用程度來設(shè)置它,不一定是 CPU 數(shù)量、預(yù)期作業(yè)數(shù)量等。

TA貢獻(xiàn)1795條經(jīng)驗(yàn) 獲得超7個贊
作業(yè)的完成速度可能與發(fā)送的速度一樣快。在這種情況下,即使有更多的項目需要處理,WaitGroup 也會在零附近浮動。
解決此問題的一種方法是在發(fā)送作業(yè)之前添加一項,并在發(fā)送所有作業(yè)后減少該作業(yè),從而有效地將發(fā)送者視為“作業(yè)”之一。在這種情況下,我們最好在wg.Add發(fā)送方中執(zhí)行以下操作:
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
wg.Add(1)
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
return
}
func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
for j := range jobs {
func(j Job) {
res := doSomethingWith(j)
out <- res
wg.Done()
}(j)
}
}
我在代碼中注意到的一件事是,每個作業(yè)都會啟動一個 goroutine。同時,每個作業(yè)jobs循環(huán)處理通道,直到清空/關(guān)閉。似乎沒有必要兩者都做。
- 2 回答
- 0 關(guān)注
- 199 瀏覽
添加回答
舉報