第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

關(guān)閉和發(fā)送到通道之間的競爭條件

關(guān)閉和發(fā)送到通道之間的競爭條件

Go
一只甜甜圈 2023-08-14 17:17:21
我正在嘗試使用工作池構(gòu)建通用管道庫。我為源、管道和接收器創(chuàng)建了一個接口。您會看到,管道的工作是從輸入通道接收數(shù)據(jù),對其進(jìn)行處理,然后將結(jié)果輸出到通道上。這是它的預(yù)期行為:從輸入通道接收數(shù)據(jù)。將數(shù)據(jù)委托給可用的工作人員。Worker 將結(jié)果發(fā)送到輸出通道。所有工作人員完成后關(guān)閉輸出通道。func (p *pipe) Process(in chan interface{}) (out chan interface{}) {    var wg sync.WaitGroup    out = make(chan interface{}, 100)    go func() {        for i := 1; i <= 100; i++ {            go p.work(in, out, &wg)        }        wg.Wait()        close(out)    }()    return}func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {    for j := range jobs {        func(j Job) {            defer wg.Done()            wg.Add(1)            res := doSomethingWith(j)            out <- res        }(j)    }}但是,運(yùn)行它可能會退出而不處理所有輸入,或者出現(xiàn)錯誤并顯示send on closed channel消息。使用該標(biāo)志構(gòu)建源會在和-race之間發(fā)出數(shù)據(jù)爭用警告。close(out)out <- res我認(rèn)為可能會發(fā)生以下情況。一旦一些工人完成了工作,wg計數(shù)器就會瞬間歸零。因此,wg.Wait()完成并且程序繼續(xù)進(jìn)行close(out)。與此同時,作業(yè)通道尚未完成數(shù)據(jù)生成,這意味著一些工作人員仍在另一個 goroutine 中運(yùn)行。由于out通道已經(jīng)關(guān)閉,因此會導(dǎo)致恐慌。等待組應(yīng)該放在其他地方嗎?或者有沒有更好的方法來等待所有工人完成?
查看完整描述

2 回答

?
撒科打諢

TA貢獻(xiàn)1934條經(jīng)驗(yàn) 獲得超2個贊

目前尚不清楚為什么每個工作需要一名工作人員,但如果您這樣做,您可以重組您的外循環(huán)設(shè)置(請參閱下面未經(jīng)測試的代碼)。這首先就消除了對工作池的需要。


不過,在解雇任何員工之前,請務(wù)必wg.Add 先執(zhí)行此操作。在這里,您正好剝離了 100 名員工:


var wg sync.WaitGroup

out = make(chan interface{}, 100)

go func() {

    for i := 1; i <= 100; i++ {

        go p.work(in, out, &wg)

    }

    wg.Wait()

    close(out)

}()

因此,您可以這樣做:


var wg sync.WaitGroup

out = make(chan interface{}, 100)

go func() {

    wg.Add(100)  // ADDED - count the 100 workers

    for i := 1; i <= 100; i++ {

        go p.work(in, out, &wg)

    }

    wg.Wait()

    close(out)

}()

請注意,您現(xiàn)在可以將wg其自身移至派生出工作線程的 goroutine 中。如果你放棄讓每個工人將工作分拆為新的 goroutine 的想法,這可以讓事情變得更干凈。但是,如果每個工作人員要派生另一個 goroutine,則該工作人員本身也必須使用wg.Add,如下所示:


for j := range jobs {

    wg.Add(1)  // ADDED - count the spun-off goroutines

    func(j Job) {

        res := doSomethingWith(j)


        out <- res

        wg.Done()  // MOVED (for illustration only, can defer as before)

    }(j)

}

wg.Done() // ADDED - our work in `p.work` is now done

wg.Add(1)也就是說,每個匿名函數(shù)都是通道的另一個用戶,因此在分拆新的 goroutine 之前增加通道用戶計數(shù) ( )。當(dāng)您讀完輸入通道后jobs,調(diào)用wg.Done()(可能通過較早的defer,但我在此處的末尾展示了它)。


思考這個問題的關(guān)鍵是計算此時可以wg寫入通道的活動 goroutine 的數(shù)量。只有當(dāng)沒有g(shù)oroutine 打算再寫入時,它才會變?yōu)榱恪?這使得關(guān)閉通道是安全的。


考慮使用相當(dāng)簡單的(但未經(jīng)測試):


func (p *pipe) Process(in chan interface{}) (out chan interface{}) {

    out = make(chan interface{})

    var wg sync.WaitGroup

    go func() {

        defer close(out)

        for j := range in {

            wg.Add(1)

            go func(j Job) {

                res := doSomethingWith(j)

                out <- res

                wg.Done()

            }(j)

        }

        wg.Wait()

    }()

    return out

}

現(xiàn)在,您有一個 goroutine 正在in以最快的速度讀取通道,并在讀取過程中分拆作業(yè)。每項傳入的工作都會獲得一個 goroutine,除非他們提前完成工作。沒有池,每個工作只有一名工人(與您的代碼相同,只是我們淘汰了沒有做任何有用事情的池)。


或者,由于只有一定數(shù)量的 CPU 可用,請像之前在開始時所做的那樣分拆一定數(shù)量的 goroutine,但讓每個 goroutine 運(yùn)行一個作業(yè)直至完成,并交付其結(jié)果,然后返回讀取下一個作業(yè):


func (p *pipe) Process(in chan interface{}) (out chan interface{}) {

    out = make(chan interface{})

    go func() {

        defer close(out)

        var wg sync.WaitGroup

        ncpu := runtime.NumCPU()  // or something fancier if you like

        wg.Add(ncpu)

        for i := 0; i < ncpu; i++ {

            go func() {

                defer wg.Done()

                for j := range in {

                    out <- doSomethingWith(j)

                }

            }()

        }

        wg.Wait()

    }

    return out

}

通過使用,runtime.NumCPU()我們只能獲得與運(yùn)行作業(yè)的 CPU 一樣多的工作線程來讀取作業(yè)。這些是池,它們一次只做一項工作。


如果輸出通道讀取器結(jié)構(gòu)良好(即不會導(dǎo)致管道阻塞),通常不需要緩沖輸出通道。如果不是,這里的緩沖深度會限制您可以“領(lǐng)先于”正在使用結(jié)果的人的工作數(shù)量。根據(jù)“提前工作”的有用程度來設(shè)置它,不一定是 CPU 數(shù)量、預(yù)期作業(yè)數(shù)量等。


查看完整回答
反對 回復(fù) 2023-08-14
?
一只萌萌小番薯

TA貢獻(xiàn)1795條經(jīng)驗(yàn) 獲得超7個贊

作業(yè)的完成速度可能與發(fā)送的速度一樣快。在這種情況下,即使有更多的項目需要處理,WaitGroup 也會在零附近浮動。


解決此問題的一種方法是在發(fā)送作業(yè)之前添加一項,并在發(fā)送所有作業(yè)后減少該作業(yè),從而有效地將發(fā)送者視為“作業(yè)”之一。在這種情況下,我們最好在wg.Add發(fā)送方中執(zhí)行以下操作:


func (p *pipe) Process(in chan interface{}) (out chan interface{}) {

    var wg sync.WaitGroup

    out = make(chan interface{}, 100)

    go func() {

        for i := 1; i <= 100; i++ {

            wg.Add(1)

            go p.work(in, out, &wg)

        }

        wg.Wait()

        close(out)

    }()


    return

}


func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {

    for j := range jobs {

        func(j Job) {

            res := doSomethingWith(j)


            out <- res

            wg.Done()

        }(j)

    }

}

我在代碼中注意到的一件事是,每個作業(yè)都會啟動一個 goroutine。同時,每個作業(yè)jobs循環(huán)處理通道,直到清空/關(guān)閉。似乎沒有必要兩者都做。


查看完整回答
反對 回復(fù) 2023-08-14
  • 2 回答
  • 0 關(guān)注
  • 199 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號