3 回答

TA貢獻(xiàn)1848條經(jīng)驗(yàn) 獲得超10個(gè)贊
在其他答案之上。
請(qǐng)(非常)小心,關(guān)閉通道應(yīng)該發(fā)生在寫(xiě)入調(diào)用站點(diǎn)上,而不是讀取調(diào)用站點(diǎn)上。在正在寫(xiě)入的GoCountColumns通道中r,關(guān)閉通道的責(zé)任落在GoCountColumns函數(shù)上。技術(shù)原因是,它是唯一確定該通道將不再被寫(xiě)入的參與者,因此可以安全關(guān)閉。
func GoCountColumns(in chan []string, r chan Result, quit chan int) {
defer close(r) // this line.
for {
select {
case data := <-in:
r <- countColumns(data) // some calculation function
case <-quit:
return // stop goroutine
}
}
}
如果我可以說(shuō),函數(shù)參數(shù)命名約定是將目標(biāo)作為第一個(gè)參數(shù),將源作為第二個(gè)參數(shù),然后使用其他參數(shù)。GoCountColumns優(yōu)選地寫(xiě)成:
func GoCountColumns(dst chan Result, src chan []string, quit chan int) {
defer close(dst)
for {
select {
case data := <-src:
dst <- countColumns(data) // some calculation function
case <-quit:
return // stop goroutine
}
}
}
quit您在流程開(kāi)始后立即致電。這是不合邏輯的。該quit命令是強(qiáng)制退出序列,一旦檢測(cè)到退出信號(hào)就應(yīng)該調(diào)用它,以盡可能以最佳狀態(tài)(可能全部損壞)強(qiáng)制退出當(dāng)前處理。換句話說(shuō),您應(yīng)該依賴該signal.Notify包來(lái)捕獲退出事件,并通知您的工作人員退出。請(qǐng)參閱https://golang.org/pkg/os/signal/#example_Notify
為了編寫(xiě)更好的并行代碼,首先列出管理程序生命周期所需的例程,確定需要阻塞的例程以確保程序在退出之前完成。
在您的代碼中,存在read, map。為了確保處理完整,程序主函數(shù)必須確保在退出時(shí)捕獲信號(hào),map然后再退出。請(qǐng)注意,該read功能并不重要。
然后,您還需要從用戶輸入捕獲退出事件所需的代碼。
總的來(lái)說(shuō),我們似乎需要阻止兩個(gè)事件來(lái)管理生命周期。示意性地說(shuō),
func main(){
go read()
go map(mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
}
}
這個(gè)簡(jiǎn)單的代碼很好process or die。事實(shí)上,當(dāng)捕獲到用戶事件時(shí),程序立即退出,而不給其他例程機(jī)會(huì)執(zhí)行停止時(shí)所需的操作。
為了改善這些行為,您首先需要一種方法來(lái)表明程序想要離開(kāi)其他例程,其次需要一種方法來(lái)等待這些例程在離開(kāi)之前完成其停止序列。
要發(fā)出退出事件或取消信號(hào),您可以使用 a context.Context,將其傳遞給工作人員,讓他們聽(tīng)。
再次,示意性地,
func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
}
}
(稍后將詳細(xì)閱讀和繪制地圖)
要等待完成,很多事情都是可能的,只要它們是線程安全的。通常,sync.WaitGroup使用 a?;蛘撸谙衲@樣的情況下,只有一個(gè)例程需要等待,我們可以重新使用當(dāng)前mapDone通道。
func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
<-mapDone
}
}
這很簡(jiǎn)單也很直接。但這并不完全正確。最后一個(gè)mapDone chan可能會(huì)永遠(yuǎn)阻塞并使程序無(wú)法停止。因此,您可以實(shí)現(xiàn)第二個(gè)信號(hào)處理程序或超時(shí)。
示意性地,超時(shí)解決方案是
func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
select {
case <-mapDone:
case <-time.After(time.Second):
}
}
}
您還可以在最后一次選擇中累積信號(hào)處理和超時(shí)。
最后,有幾件事要講read和map上下文聆聽(tīng)。
首先map,實(shí)現(xiàn)需要定期讀取context.Done通道來(lái)檢測(cè)cancellation。
這是簡(jiǎn)單的部分,只需要更新 select 語(yǔ)句。
func GoCountColumns(ctx context.Context, dst chan Result, src chan []string) {
defer close(dst)
for {
select {
case <-ctx.Done():
<-time.After(time.Minute) // do something more useful.
return // quit. Notice the defer will be called.
case data := <-src:
dst <- countColumns(data) // some calculation function
}
}
}
現(xiàn)在這read部分有點(diǎn)棘手,因?yàn)樗且粋€(gè) IO,它不提供select強(qiáng)大的編程接口,并且監(jiān)聽(tīng)上下文通道取消可能看起來(lái)很矛盾。這是。由于 IO 是阻塞的,因此無(wú)法偵聽(tīng)上下文。并且在從上下文通道讀取時(shí),無(wú)法讀取 IO。在您的情況下,解決方案需要了解您的讀取循環(huán)與您的程序生命周期無(wú)關(guān)(還記得我們只監(jiān)聽(tīng)mapDone嗎?),并且我們可以忽略上下文。
在其他情況下,例如,如果您想在讀取最后一個(gè)字節(jié)時(shí)重新啟動(dòng)(因此在每次讀取時(shí),我們都會(huì)增加 n,計(jì)算字節(jié)數(shù),并且我們希望在停止時(shí)保存該值)。然后,需要啟動(dòng)一個(gè)新的例程,因此,多個(gè)例程需要等待完成。在這種情況下,async.WaitGroup會(huì)更合適。
示意性地說(shuō),
func main(){
var wg sync.WaitGroup
processDone:=make(chan struct{})
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
wg.Add(1)
go saveN(ctx,&wg)
wg.Add(1)
go map(ctx,&wg)
go signal()
go func(){
wg.Wait()
close(processDone)
}()
select {
case <-processDone:
case <-sig:
cancel()
select {
case <-processDone:
case <-time.After(time.Second):
}
}
}
在最后的代碼中,正在傳遞等待組。例程負(fù)責(zé)調(diào)用wg.Done(),當(dāng)所有例程完成后,processDone通道關(guān)閉,以發(fā)出選擇信號(hào)。
func GoCountColumns(ctx context.Context, dst chan Result, src chan []string, wg *sync.WaitGroup) {
defer wg.Done()
defer close(dst)
for {
select {
case <-ctx.Done():
<-time.After(time.Minute) // do something more useful.
return // quit. Notice the defer will be called.
case data := <-src:
dst <- countColumns(data) // some calculation function
}
}
}
尚未確定哪種模式是首選,但您也可能會(huì)看到waitgroup僅在調(diào)用站點(diǎn)進(jìn)行管理。
func main(){
var wg sync.WaitGroup
processDone:=make(chan struct{})
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
wg.Add(1)
go func(){
defer wg.Done()
saveN(ctx)
}()
wg.Add(1)
go func(){
defer wg.Done()
map(ctx)
}()
go signal()
go func(){
wg.Wait()
close(processDone)
}()
select {
case <-processDone:
case <-sig:
cancel()
select {
case <-processDone:
case <-time.After(time.Second):
}
}
}
除了所有這些問(wèn)題和 OP 問(wèn)題之外,您必須始終預(yù)先評(píng)估并行處理對(duì)于給定任務(wù)的相關(guān)性。沒(méi)有獨(dú)特的秘訣,練習(xí)和衡量你的代碼性能。參見(jiàn) pprof.

TA貢獻(xiàn)1850條經(jīng)驗(yàn) 獲得超11個(gè)贊
這段代碼中發(fā)生的事情太多了。您應(yīng)該將代碼重組為服務(wù)于特定目的的短函數(shù),以便其他人可以輕松地幫助您(也可以幫助您自己)。
有多種方法可以讓一個(gè) go-routine 等待其他工作完成。最常見(jiàn)的方法是使用等待組(我提供的示例)或通道。
func processSomething(...) {
? ? ...
}
func main() {
? ? workers := &sync.WaitGroup{}
? ? for i := 0; i < numWorkers; i++ {
? ? ? ? workers.Add(1) // you want to call this from the calling go-routine and before spawning the worker go-routine
? ? ? ? go func() {
? ? ? ? ? ? defer workers.Done() // you want to call this from the worker go-routine when the work is done (NOTE the defer, which ensures it is called no matter what)
? ? ? ? ? ? processSomething(....) // your async processing
? ? ? ? }()
? ? }
? ? // this will block until all workers have finished their work
? ? workers.Wait()
}

TA貢獻(xiàn)1775條經(jīng)驗(yàn) 獲得超8個(gè)贊
您可以使用通道來(lái)阻塞,main直到 Goroutine 完成。
package main
import (
"log"
"time"
)
func main() {
c := make(chan struct{})
go func() {
time.Sleep(3 * time.Second)
log.Println("bye")
close(c)
}()
// This blocks until the channel is closed by the routine
<-c
}
無(wú)需向通道寫(xiě)入任何內(nèi)容。讀取會(huì)被阻塞,直到讀取數(shù)據(jù)或者我們?cè)谶@里使用的通道關(guān)閉為止。
- 3 回答
- 0 關(guān)注
- 186 瀏覽
添加回答
舉報(bào)