2 回答

TA貢獻1868條經驗 獲得超4個贊
在完成將所有行發(fā)送到 之前,您不會開始讀取,這比緩沖區(qū)空間的行數多。因此,一旦緩沖區(qū)已滿,發(fā)送塊(開始填充緩沖區(qū)),一旦緩沖區(qū)已滿,它就會死鎖。將發(fā)送到 的循環(huán)、從 讀取的循環(huán)或兩者一起移動,以分隔 goroutine,例如:doneQlineParseQdoneQlineParseQlineParseQdoneQ
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
這仍然會在結束時陷入僵局,因為你已經在同一個goroutine中有一個過端通道和之后的通道;由于繼續(xù)直到通道關閉,并且關閉在完成后,您仍然有一個死鎖。您需要將關閉放在適當的位置;也就是說,無論是在發(fā)送例程中,還是在監(jiān)視發(fā)送例程時,如果給定通道有多個發(fā)送方,則阻止發(fā)送例程。rangecloserangerangeWaitGroup
// Start line parsing workers and send to jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++ {
wg.Add(1)
go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
countSend := 0
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
go func() {
wg.Wait()
close(jobProcessQ)
}()
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
// ...
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
wg.Done()
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
close(done)
}
完整的工作示例如下:https://play.golang.org/p/XsnewSZeb2X

TA貢獻1863條經驗 獲得超2個贊
協調管道,將每個部分分成幾個階段。當您知道管道的一部分已完成(并且沒有人寫入特定通道)時,請關閉該通道以指示所有“工作人員”退出,例如sync.WaitGroup
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
i := i
wg.Add(1)
go func() {
Worker(i)
wg.Done()
}()
}
// wg.Wait() signals the above have completed
緩沖通道對于處理突發(fā)工作負載非常方便,但有時它們用于避免不良設計中的死鎖。如果要避免在 goroutine 中運行管道的某些部分,可以緩沖一些通道(通常與 worker 的數量匹配),以避免主 goroutine 堵塞。
如果您有讀取和寫入的依賴部分,并希望避免死鎖 - 請確保它們位于單獨的goroutine中。將管道的所有部分都擁有自己的goroutine,甚至可以消除對緩沖通道的需求:
// putting all channel work into separate goroutines
// removes the need for buffered channels
lineParseQ := make(chan string, 0)
jobProcessQ := make(chan myStruct, 0)
doneQ := make(chan myStruct, 0)
當然,這是一個權衡 - 一個goroutine的資源成本約為2K - 而緩沖通道要少得多。與大多數設計一樣,這取決于如何使用它。
也不要被臭名昭著的Go for-loop gotcha抓住,所以使用閉包賦值來避免這種情況:
for i := 1; i <= 5; i++ {
i := i // new i (not the i above)
go func() {
myfunc(i) // otherwise all goroutines will most likely get '5'
}()
}
最后,請確保在退出之前等待所有結果得到處理。從基于通道的函數返回并認為所有結果都已處理是一個常見的錯誤。在服務中,這最終將是正確的。但在獨立的可執(zhí)行文件中,處理循環(huán)可能仍在處理結果。
go func() {
wgW.Wait() // waiting on worker goroutines to finish
close(doneQ) // safe to close results channel now
}()
// ensure we don't return until all results have been processed
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
通過在主goroutine中處理結果,我們確保在未處理所有內容的情況下不會過早返回。
將它們全部放在一起:
https://play.golang.org/p/MjLpQ5xglP3
- 2 回答
- 0 關注
- 117 瀏覽
添加回答
舉報