3 回答

TA貢獻(xiàn)1841條經(jīng)驗(yàn) 獲得超3個(gè)贊
您的信號(hào)量sem不起作用,因?yàn)槟鷮?duì)其進(jìn)行了深度緩沖。
一般來(lái)說(shuō),這是為此類(lèi)任務(wù)設(shè)置映射的錯(cuò)誤方法,因?yàn)樽x取文件將是緩慢的部分。如果您有一個(gè)更復(fù)雜的任務(wù) - 例如,讀取一行,思考很多,設(shè)置一些東西 - 您會(huì)希望將其作為您的偽代碼結(jié)構(gòu):
type workType struct {
index int
line string
}
var wg sync.WaitGroup
wg.Add(nWorkers)
// I made this buffered originally but there's no real point, so
// fixing that in an edit
work := make(chan workType)
for i := 0; i < nWorkers; i++ {
go readAndDoWork(work, &wg)
}
for i := 1; fscanner.Scan(); i++ {
work <- workType{index: i, line: fscanner.Text()}
}
close(work)
wg.Wait()
... now your dictionary is ready ...
工人們這樣做:
func readAndDoWork(ch chan workType, wg *sync.WorkGroup) {
for item := range ch {
... do computation ...
insertIntoDict(item.index, result)
}
wg.Done()
}
獲取insertIntoDict互斥體(以保護(hù)從索引到結(jié)果的映射)并寫(xiě)入字典。(如果您愿意,可以將其內(nèi)聯(lián)。)
這里的想法是設(shè)置一定數(shù)量的工作線程(可能基于可用 CPU 的數(shù)量),每個(gè)工作線程獲取下一個(gè)工作項(xiàng)并處理它。主 Goroutine 只是分配工作,然后關(guān)閉工作通道——這將導(dǎo)致所有工作人員看到輸入結(jié)束——然后等待他們發(fā)出計(jì)算完成的信號(hào)。
(如果您愿意,您可以再創(chuàng)建一個(gè) goroutine 來(lái)讀取工作程序計(jì)算的結(jié)果并將其放入映射中。這樣您就不需要映射本身的互斥鎖。)

TA貢獻(xiàn)1818條經(jīng)驗(yàn) 獲得超8個(gè)贊
好吧,我已經(jīng)弄清楚了。通過(guò)復(fù)制來(lái)賦予 goroutine 一個(gè)值來(lái)保存,似乎是可行的。
改變:
for fscanner.Scan() {
text := fscanner.Text()
wg.Add(1)
sem <- 1
go func() {
mu.Lock()
defer mu.Unlock()
ourDict[indexPos] = text
indexPos++
<- sem
wg.Done()
}()
}
到
for fscanner.Scan() {
text := fscanner.Text()
wg.Add(1)
sem <- 1
go func(mypos int) {
mu.Lock()
defer mu.Unlock()
ourDict[mypos] = text
<-sem
wg.Done()
}(indexPos)
indexPos++
}
完整代碼: https: //play.golang.org/p/dkHaisPHyHz
使用工人池,
package main
import (
"bufio"
"fmt"
"log"
"os"
"sync"
)
const (
MAX = 10
fileName = "some.dat"
)
type gunk struct {
line string
id int
}
func main() {
ourDict := make(map[int]string)
wg := sync.WaitGroup{}
mu := sync.RWMutex{}
cha := make(chan gunk)
for i := 0; i < MAX; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
textin, ok := <-cha
if !ok {
return
}
mu.Lock()
ourDict[textin.id] = textin.line
mu.Unlock()
}
}(i)
}
f, err := os.Open(fileName)
checkerr(err)
defer f.Close()
fscanner := bufio.NewScanner(f)
indexPos := 1
for fscanner.Scan() {
text := fscanner.Text()
thisgunk := gunk{line: text, id: indexPos}
cha <- thisgunk
indexPos++
}
close(cha)
wg.Wait()
for i, v := range ourDict {
fmt.Printf("%d: %s\n", i, v)
}
}
func checkerr(err error) {
if err != nil {
fmt.Println(err)
log.Fatal(err)
}
}

TA貢獻(xiàn)1826條經(jīng)驗(yàn) 獲得超6個(gè)贊
正如我在評(píng)論中提到的,您無(wú)法控制 goroutine 的執(zhí)行順序,因此不應(yīng)從它們內(nèi)部更改索引。
這是一個(gè)示例,其中與地圖的交互在單個(gè) goroutine 中進(jìn)行,而您的處理則在其他 goroutine 中進(jìn)行:
package main
import (
"bufio"
"fmt"
"log"
"os"
"sync"
)
var (
fileName = "some.dat"
MAX = 9000
)
func checkerr(err error) {
if err != nil {
fmt.Println(err)
log.Fatal(err)
}
}
type result struct {
index int
data string
}
func main() {
ourDict := make(map[int]string)
f, err := os.Open(fileName)
checkerr(err)
defer f.Close()
fscanner := bufio.NewScanner(f)
var wg sync.WaitGroup
sem := make(chan struct{}, MAX) // Use empty structs for semaphores as they have no allocation
defer close(sem)
out := make(chan result)
defer close(out)
indexPos := 1
for fscanner.Scan() {
text := fscanner.Text()
wg.Add(1)
sem <- struct{}{}
go func(index int, data string) {
// Defer the release of your resources, otherwise if any error occur in your goroutine
// you'll have a deadlock
defer func() {
wg.Done()
<-sem
}()
// Process your data
out <- result{index, data}
}(indexPos, text) // Pass in the data that will change on the iteration, go optimizer will move it around better
indexPos++
}
// The goroutine is the only one to write to the dict, so no race condition
go func() {
for {
if entry, ok := <-out; ok {
ourDict[entry.index] = entry.data
} else {
return // Exit goroutine when channel closes
}
}
}()
wg.Wait()
for i, v := range ourDict {
fmt.Printf("%d: %s\n", i, v)
}
}
- 3 回答
- 0 關(guān)注
- 183 瀏覽
添加回答
舉報(bào)