2 回答

TA貢獻(xiàn)1789條經(jīng)驗(yàn) 獲得超8個(gè)贊
您可以使用的一種簡(jiǎn)單方法是將用于將鏈接添加到頻道的代碼移動(dòng)到它自己的 go 例程中。這樣,您的主要處理可以繼續(xù),而阻塞的通道寫(xiě)入將阻塞一個(gè)單獨(dú)的 go 例程。
func process(){
for currentURL := range queue {
links, _ := ... // some http call that gets links from a url
for _, link := links {
l := link // this is important! ...
// the loop will re-set the value of link before the go routine is started
go func(l) {
queue <- link // we'll be blocked here...
// but the "parent" routine can still iterate through the channel
// which in turn un-blocks the write
}(l)
}
}
}
使用信號(hào)量示例編輯以限制 go 例程:
func main () {
maxWorkers := 5000
sem := semaphore.NewWeighted(int64(maxWorkers))
ctx := context.TODO()
for i :=0; i < 10; i++ {
go process(ctx)
}
queue <- "https://stackoverflow.com"
// block until receive some quit message
<-quit
}
func process(ctx context.Context){
for currentURL := range queue {
links, _ := ... // some http call that gets links from a url
for _, link := links {
l := link // this is important! ...
// the loop will re-set the value of link before the go routine is started
// acquire a go routine...
// if we are at the routine limit, this line will block until one becomes available
sem.Acquire(ctx, 1)
go func(l) {
defer sem.Release(1)
queue <- link // we'll be blocked here...
// but the "parent" routine can still iterate through the channel
// which in turn un-blocks the write
}(l)
}
}
}
但是這個(gè)選項(xiàng)最終可能會(huì)導(dǎo)致死鎖...假設(shè)所有的 go 例程都已聲明,父循環(huán)可能會(huì)被鎖定在sem.Acquire. 這將導(dǎo)致子例程永遠(yuǎn)不會(huì)添加到通道中,因此永遠(yuǎn)不會(huì)執(zhí)行 deferred sem.Release。在我的腦海中,我正在努力想出一個(gè)很好的方法來(lái)處理這個(gè)問(wèn)題。也許是外部?jī)?nèi)存隊(duì)列而不是通道?

TA貢獻(xiàn)1818條經(jīng)驗(yàn) 獲得超8個(gè)贊
有兩件事你可以做,要么使用緩沖通道不阻塞,即使另一端沒(méi)有人接收。這樣您就可以立即刷新通道內(nèi)的值。
一種更有效的方法是檢查通道中是否有任何可用值,或者通道是否關(guān)閉,這應(yīng)該由發(fā)送方在發(fā)送所有值時(shí)關(guān)閉。
接收者可以通過(guò)為接收表達(dá)式分配第二個(gè)參數(shù)來(lái)測(cè)試通道是否已關(guān)閉。
v, ok := <-ch
ok如果false沒(méi)有更多的值可以接收并且通道關(guān)閉。使用 select as 檢查通道內(nèi)的值
package main
import (
"fmt"
"sync"
)
var queue = make(chan int)
var wg sync.WaitGroup
func process(){
values := []int{1,2,5,3,9,7}
for _, value := range values {
queue <- value
}
}
func main () {
for i :=0; i < 10; i++ {
go process()
}
wg.Add(1)
go func(){
defer wg.Done()
for j:=0;j<30;j++ {
select {
case <-queue:
fmt.Println(<-queue)
}
}
}()
wg.Wait()
close(queue)
}
- 2 回答
- 0 關(guān)注
- 100 瀏覽
添加回答
舉報(bào)