第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用通道作為隊列的死鎖

使用通道作為隊列的死鎖

Go
哆啦的時光機 2022-09-05 10:39:01
我正在學習 Go,我正在嘗試實現作業(yè)隊列。我想做的是:讓主 goroutine 饋送行通過一個通道供多個解析器工作線程(將一條線解析為 s 結構),并讓每個分析器將結構發(fā)送到其他工作線程(goroutines)將處理的結構通道(發(fā)送到數據庫等)。代碼如下所示:lineParseQ := make(chan string, 5)jobProcessQ := make(chan myStruct, 5)doneQ := make(chan myStruct, 5)fileName := "myfile.csv"file, err := os.Open(fileName)if err != nil {    log.Fatal(err)}defer file.Close()reader := bufio.NewReader(file)// Start line parsing workers and send to jobProcessQfor i := 1; i <= 2; i++ {    go lineToStructWorker(i, lineParseQ, jobProcessQ)}// Process myStruct from jobProcessQfor i := 1; i <= 5; i++ {    go WorkerProcessStruct(i, jobProcessQ, doneQ)}lineCount := 0 countSend := 0for {    line, err := reader.ReadString('\n')        if err != nil && err != io.EOF {        log.Fatal(err)    }        if err == io.EOF {        break    }        lineCount++        if lineCount > 1 {        countSend++        lineParseQ <- line[:len(line)-1]    // Avoid last char '\n'    }}for i := 0; i < countSend; i++ {    fmt.Printf("Received %+v.\n", <-doneQ)}close(doneQ)close(jobProcessQ)close(lineParseQ)這是一個簡化的游樂場:https://play.golang.org/p/yz84g6CJraa工人看起來像這樣:func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {    for j := range lineQ {        strQ <- lineToStruct(j) // just parses the csv to a struct...    }}func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {    for a := range strQ {        time.Sleep(time.Millisecond * 500) // fake long operation...        done <- a    }}我知道問題與“完成”通道有關,因為如果我不使用它,就不會有錯誤,但我不知道如何解決它。
查看完整描述

2 回答

?
MYYA

TA貢獻1868條經驗 獲得超4個贊

在完成將所有行發(fā)送到 之前,您不會開始讀取,這比緩沖區(qū)空間的行數多。因此,一旦緩沖區(qū)已滿,發(fā)送塊(開始填充緩沖區(qū)),一旦緩沖區(qū)已滿,它就會死鎖。將發(fā)送到 的循環(huán)、從 讀取的循環(huán)或兩者一起移動,以分隔 goroutine,例如:doneQlineParseQdoneQlineParseQlineParseQdoneQ


go func() {

    for _, line := range lines {

        countSend++

        lineParseQ <- line

    }

    close(lineParseQ)

}()

這仍然會在結束時陷入僵局,因為你已經在同一個goroutine中有一個過端通道和之后的通道;由于繼續(xù)直到通道關閉,并且關閉在完成后,您仍然有一個死鎖。您需要將關閉放在適當的位置;也就是說,無論是在發(fā)送例程中,還是在監(jiān)視發(fā)送例程時,如果給定通道有多個發(fā)送方,則阻止發(fā)送例程。rangecloserangerangeWaitGroup


// Start line parsing workers and send to jobProcessQ

wg := new(sync.WaitGroup)

for i := 1; i <= 2; i++ {

    wg.Add(1)

    go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)

}


// Process myStruct from jobProcessQ

for i := 1; i <= 5; i++ {

    go WorkerProcessStruct(i, jobProcessQ, doneQ)

}


countSend := 0


go func() {

    for _, line := range lines {

        countSend++

        lineParseQ <- line

    }

    close(lineParseQ)

}()


go func() {

    wg.Wait()

    close(jobProcessQ)

}()


for a := range doneQ {

    fmt.Printf("Received %v.\n", a)

}


// ...


func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {

    for j := range lineQ {

        strQ <- lineToStruct(j) // just parses the csv to a struct...

    }

    wg.Done()

}


func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {

    for a := range strQ {

        time.Sleep(time.Millisecond * 500) // fake long operation...

        done <- a

    }

    close(done)

}

完整的工作示例如下:https://play.golang.org/p/XsnewSZeb2X


查看完整回答
反對 回復 2022-09-05
?
小唯快跑啊

TA貢獻1863條經驗 獲得超2個贊

協調管道,將每個部分分成幾個階段。當您知道管道的一部分已完成(并且沒有人寫入特定通道)時,請關閉該通道以指示所有“工作人員”退出,例如sync.WaitGroup


var wg sync.WaitGroup

for i := 1; i <= 5; i++ {

    i := i

    wg.Add(1)

    go func() {

        Worker(i)

        wg.Done()

    }()

}


// wg.Wait() signals the above have completed

緩沖通道對于處理突發(fā)工作負載非常方便,但有時它們用于避免不良設計中的死鎖。如果要避免在 goroutine 中運行管道的某些部分,可以緩沖一些通道(通常與 worker 的數量匹配),以避免主 goroutine 堵塞。


如果您有讀取和寫入的依賴部分,并希望避免死鎖 - 請確保它們位于單獨的goroutine中。將管道的所有部分都擁有自己的goroutine,甚至可以消除對緩沖通道的需求:


// putting all channel work into separate goroutines

// removes the need for buffered channels

lineParseQ := make(chan string, 0)

jobProcessQ := make(chan myStruct, 0)

doneQ := make(chan myStruct, 0)

當然,這是一個權衡 - 一個goroutine的資源成本約為2K - 而緩沖通道要少得多。與大多數設計一樣,這取決于如何使用它。


也不要被臭名昭著的Go for-loop gotcha抓住,所以使用閉包賦值來避免這種情況:


for i := 1; i <= 5; i++ {

    i := i       // new i (not the i above)

    go func() {

        myfunc(i) // otherwise all goroutines will most likely get '5'

    }()

}

最后,請確保在退出之前等待所有結果得到處理。從基于通道的函數返回并認為所有結果都已處理是一個常見的錯誤。在服務中,這最終將是正確的。但在獨立的可執(zhí)行文件中,處理循環(huán)可能仍在處理結果。


go func() {

    wgW.Wait()   // waiting on worker goroutines to finish

    close(doneQ) // safe to close results channel now

}()


// ensure we don't return until all results have been processed

for a := range doneQ {

    fmt.Printf("Received %v.\n", a)

}

通過在主goroutine中處理結果,我們確保在未處理所有內容的情況下不會過早返回。


將它們全部放在一起:


https://play.golang.org/p/MjLpQ5xglP3


查看完整回答
反對 回復 2022-09-05
  • 2 回答
  • 0 關注
  • 117 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號