2 回答

TA貢獻(xiàn)1900條經(jīng)驗(yàn) 獲得超5個(gè)贊
使用以下內(nèi)容將字符串通道轉(zhuǎn)換為讀取器:
type chanReader struct {
c chan string
buf string
}
func (r *chanReader) Read(p []byte) (int, error) {
// Fill the buffer when we have no data to return to the caller
if len(r.buf) == 0 {
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
}
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
像這樣使用它:
r := csv.NewReader(&chanReader{c: feederChan})
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}
如果應(yīng)用程序假定換行符分隔從通道接收的值,則將換行符附加到每個(gè)接收到的值:
...
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
r.buf += "\n"
...
復(fù)制+= "\n"字符串。如果這不能滿足應(yīng)用程序的效率要求,則引入一個(gè)新字段來管理行分隔符。
type chanReader struct {
c chan string // source of lines
buf string // the current line
nl bool // true if line separator is pending
}
func (r *chanReader) Read(p []byte) (int, error) {
// Fill the buffer when we have no data to return to the caller
if len(r.buf) == 0 && !r.nl {
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
r.nl = true
}
// Return data if we have it
if len(r.buf) > 0 {
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
// No data, return the line separator
n := copy(p, "\n")
r.nl = n == 0
return n, nil
}
另一種方法是按照問題評(píng)論中的建議,使用 io.Pipe 和 goroutine 將通道轉(zhuǎn)換為 io.Reader。這種方法的第一步是:
var nl = []byte("\n")
func createChanReader(c chan string) io.Reader {
r, w := io.Pipe()
go func() {
defer w.Close()
for s := range c {
io.WriteString(w, s)
w.Write(nl)
}
}
}()
return r
}
像這樣使用它:
r := csv.NewReader(createChanReader(feederChan))
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}
當(dāng)應(yīng)用程序在將管道讀取到 EOF 之前退出循環(huán)時(shí), io.Pipe 解決方案的第一遍會(huì)泄漏 goroutine。應(yīng)用程序可能會(huì)提前中斷,因?yàn)?CSV 閱讀器檢測(cè)到語法錯(cuò)誤,應(yīng)用程序由于程序員錯(cuò)誤或任何其他原因而崩潰。
要修復(fù) goroutine 泄漏,請(qǐng)?jiān)趯懭脲e(cuò)誤時(shí)退出寫入 goroutine,并在完成讀取后關(guān)閉管道讀取器。
var nl = []byte("\n")
func createChanReader(c chan string) *io.PipeReader {
r, w := io.Pipe()
go func() {
defer w.Close()
for s := range c {
if _, err := io.WriteString(w, s); err != nil {
return
}
if _, err := w.Write(nl); err != nil {
return
}
}
}()
return r
}
像這樣使用它:
cr := createChanReader(feederChan)
defer cr.Close() // Required for goroutine cleanup
r := csv.NewReader(cr)
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}

TA貢獻(xiàn)1825條經(jīng)驗(yàn) 獲得超4個(gè)贊
我最終還是使用了 io.Pipe() “正如 mh-cbon 提到的那樣”,它更簡單并且看起來更有效(如下所述):
rp, wp := io.Pipe()
go func() {
? ? defer wp.Close()
? ? for i := range feederChan {
? ? ? ? fmt.Fprintln(wp, i)
? ? }
}()
r := csv.NewReader(rp)
for { // keep reading
? ? a, err := r.Read()
? ? if err == io.EOF {
? ? ? ? break
? ? }
? ? // do stuff with 'a'
? ? // ...
}
io.Pipe() 是同步的,并且應(yīng)該相當(dāng)高效:它將數(shù)據(jù)從寫入器通過管道傳輸?shù)阶x取器;我將 csv.NewReader() 提供給讀者部分,并創(chuàng)建了一個(gè) goroutine,將 chan 寫入到作者部分。
多謝。
- 2 回答
- 0 關(guān)注
- 135 瀏覽
添加回答
舉報(bào)