1 回答

TA貢獻1880條經(jīng)驗 獲得超4個贊
這一切都很好,但我遇到的問題是堆棧本身的長度可能為零,所以在我嘗試將堆棧 [0] 發(fā)送到 jobchan 的情況下,如果堆棧為空,則會出現(xiàn)越界錯誤。
我無法使用您的游樂場鏈接重現(xiàn)它,但它是可信的,因為至少有一名gofunc
工作人員可能已經(jīng)準備好在該頻道上接收。
我的輸出是Msgcnt: 0
,這也很容易解釋,因為在運行它時gofunc
可能還沒有準備好接收。這些操作的順序沒有定義。jobschan
dispatch()
select
嘗試創(chuàng)建一個調(diào)度 go 例程,將作業(yè)發(fā)送到在 jobchan 頻道上監(jiān)聽的工作池
通道不需要調(diào)度程序。通道是調(diào)度程序。
如果一條消息通過 dispatchchan 通道進入我的調(diào)度函數(shù)并且我的其他 go 例程正忙,則消息是 [...] 將 [...] 稍后當工作人員可用時再次發(fā)送,[...] 或否在 dispatchchan 上收到更多消息。
通過一些創(chuàng)造性的編輯,很容易將其變成接近緩沖通道定義的東西。它可以立即讀取,也可以占用一些limit
無法立即發(fā)送的消息。你確實定義limit
了,盡管它沒有在你的代碼中的其他地方使用。
在任何函數(shù)中,定義一個您不讀取的變量都會導致編譯時錯誤,例如limit declared but not used
. 這種限制提高了代碼質量并有助于識別類型。但是在包范圍內(nèi),您已經(jīng)擺脫了將未使用的定義limit
為“全局”,從而避免了一個有用的錯誤——您沒有限制任何東西。
不要使用全局變量。使用傳遞的參數(shù)來定義作用域,因為作用域的定義相當于用go
關鍵字表達的功能并發(fā)。 將本地范圍內(nèi)定義的相關通道傳遞給包范圍內(nèi)定義的函數(shù),以便您輕松跟蹤它們的關系。并使用定向渠道來強制執(zhí)行您的功能之間的生產(chǎn)者/消費者關系。稍后再談。
回到“限制”,限制您排隊的作業(yè)數(shù)量是有意義的,因為所有資源都是有限的,并且接受比您對處理的任何預期更多的消息需要比進程內(nèi)存提供的更持久的存儲。如果你覺得無論如何都沒有義務滿足這些要求,那么一開始就不要接受“太多”的要求。
那么,什么功能有dispatchchan
和dispatch()
?在處理之前存儲有限數(shù)量的待處理請求(如果有的話),然后將它們發(fā)送給下一個可用的工作人員?這正是緩沖通道的用途。
循環(huán)邏輯
誰“知道”你的程序何時完成? main()
提供初始輸入,但在 `dispatch() 中關閉所有 3 個通道:
close(jobchan) close(dispatchchan) close(mw)
您的工作人員寫入自己的作業(yè)隊列,因此只有在工作人員完成寫入后才能關閉傳入的作業(yè)隊列。但是,個別工作人員也不知道何時關閉作業(yè)隊列,因為其他工作人員正在寫入它。 沒有人知道你的算法何時完成。這就是你的循環(huán)邏輯。
mw 通道是一個緩沖通道,長度與 worker go 例程的數(shù)量相同。它充當工作池的信號量。
這里有一個競爭條件??紤]所有n
工人剛剛收到最后n
一份工作的情況。他們每個人都讀取jobschan
并檢查ok
. disptatcher
繼續(xù)運行它的select
. 沒有人現(xiàn)在正在寫信dispatchchan
或閱讀,jobschan
因此default
案件會立即匹配。 len(stack)
是0
并且沒有電流job
,因此dispatcher
關閉所有頻道,包括mw
. 此后的某個時候,一名工作人員試圖寫入一個關閉的通道并出現(xiàn)恐慌。
所以最后我準備提供一些代碼,但我還有一個問題:我沒有一個明確的問題陳述來編寫代碼。
我剛剛開始使用 Go 并發(fā)并嘗試創(chuàng)建一個調(diào)度 go 例程,該例程會將作業(yè)發(fā)送到在 jobchan 通道上偵聽的工作池。
goroutine 之間的通道就像同步齒輪的齒。但是齒輪會轉動到什么地方呢?你不是想保持時間,也不是制造發(fā)條玩具。你的齒輪可以轉動,但成功會是什么樣子?他們的轉身?
讓我們嘗試為通道定義一個更具體的用例:給定一組任意長的持續(xù)時間作為標準輸入*上的字符串,在其中一個工作n
人員中休眠那么多秒。所以我們實際上有一個結果要返回,我們會說每個工作人員將返回運行持續(xù)時間的開始和結束時間。
為了讓它可以在操場上運行,我將使用硬編碼的字節(jié)緩沖區(qū)來模擬標準輸入。
package main
import (
"bufio"
"bytes"
"fmt"
"os"
"strings"
"sync"
"time"
)
type SleepResult struct {
worker_id int
duration time.Duration
start time.Time
end time.Time
}
func main() {
var num_workers = 2
workchan := make(chan time.Duration)
resultschan := make(chan SleepResult)
var wg sync.WaitGroup
var resultswg sync.WaitGroup
resultswg.Add(1)
go results(&resultswg, resultschan)
for i := 0; i < num_workers; i++ {
wg.Add(1)
go worker(i, &wg, workchan, resultschan)
}
// playground doesn't have stdin
var input = bytes.NewBufferString(
strings.Join([]string{
"3ms",
"1 seconds",
"3600ms",
"300 ms",
"5s",
"0.05min"}, "\n") + "\n")
var scanner = bufio.NewScanner(input)
for scanner.Scan() {
text := scanner.Text()
if dur, err := time.ParseDuration(text); err != nil {
fmt.Fprintln(os.Stderr, "Invalid duration", text)
} else {
workchan <- dur
}
}
close(workchan) // we know when our inputs are done
wg.Wait() // and when our jobs are done
close(resultschan)
resultswg.Wait()
}
func results(wg *sync.WaitGroup, resultschan <-chan SleepResult) {
for res := range resultschan {
fmt.Printf("Worker %d: %s : %s => %s\n",
res.worker_id, res.duration,
res.start.Format(time.RFC3339Nano), res.end.Format(time.RFC3339Nano))
}
wg.Done()
}
func worker(id int, wg *sync.WaitGroup, jobchan <-chan time.Duration, resultschan chan<- SleepResult) {
var res = SleepResult{worker_id: id}
for dur := range jobchan {
res.duration = dur
res.start = time.Now()
time.Sleep(res.duration)
res.end = time.Now()
resultschan <- res
}
wg.Done()
}
在這里,我使用了 2 個等待組,一個用于工人,一個用于結果。這確保我在main()
結束之前完成了所有結果的編寫。我通過讓每個函數(shù)一次只做一件事來保持我的函數(shù)簡單:main 讀取輸入,從中解析持續(xù)時間,然后將它們發(fā)送給下一個 worker。該results
函數(shù)收集結果并將它們打印到標準輸出。工人負責睡眠、讀取jobchan
和寫入resultschan
。
workchan
可以緩沖(或不緩沖,如本例所示);沒關系,因為輸入將以可以處理的速度讀取。我們可以緩沖盡可能多的輸入,但我們不能緩沖無限量。我已經(jīng)將通道大小設置為1e6
- 但一百萬遠小于無限。對于我的用例,我根本不需要做任何緩沖。
main
知道輸入何時完成并可以關閉jobschan
. main
還知道作業(yè)何時完成 ( wg.Wait()
) 并可以關閉結果通道。worker
關閉這些通道是對 goroutine和goroutine的一個重要信號results
——它們可以區(qū)分一個空的通道和一個保證不會有任何新添加的通道。
for job := range jobchan {...}
是您更詳細的簡寫:
for {
job, ok := <- jobchan
if !ok {
wg.Done()
return
}
...
}
請注意,此代碼創(chuàng)建了 2 個工人,但它可以創(chuàng)建 20 個或 2000 個,甚至 1 個。無論池中有多少工人,程序都會運行。它可以處理任何數(shù)量的輸入(盡管無休止的輸入當然會導致無休止的程序)。它不會創(chuàng)建輸出到輸入的循環(huán)循環(huán)。如果您的用例需要作業(yè)來創(chuàng)建更多作業(yè),那么這是一個更具挑戰(zhàn)性的場景,通??梢酝ㄟ^仔細規(guī)劃來避免。
我希望這能給你一些關于如何在 Go 應用程序中更好地使用并發(fā)的好主意。
https://play.golang.wiki/p/cZuI9YXypxI
- 1 回答
- 0 關注
- 140 瀏覽
添加回答
舉報