2 回答

TA貢獻(xiàn)1783條經(jīng)驗(yàn) 獲得超4個(gè)贊
根據(jù)一些評(píng)論,您正在尋找的似乎是某種形式的批處理。
批處理有幾種情況,當(dāng)您想要獲取批處理并將其一起發(fā)送時(shí):
批量大小足夠
已經(jīng)過(guò)了足夠的時(shí)間,應(yīng)該沖洗部分批次
您給出的示例不考慮第二種情況。如果您只是因?yàn)橥V辜虞d而從不沖水,這可能會(huì)導(dǎo)致一些尷尬的行為。
因此,我建議要么查看庫(kù)(例如,cloudfoundry/go-batching),要么簡(jiǎn)單地使用通道、計(jì)時(shí)器和選擇語(yǔ)句。
package main
import (
? ? "fmt"
? ? "time"
)
func main() {
? ? ch := make(chan string)
? ? go func() {
? ? ? ? tasks := []string{}
? ? ? ? timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience
? ? ? ? for {
? ? ? ? ? ? select {
? ? ? ? ? ? case <-timer.C:
? ? ? ? ? ? ? ? fmt.Println("Flush partial batch due to time")
? ? ? ? ? ? ? ? flush(tasks)
? ? ? ? ? ? ? ? tasks = nil
? ? ? ? ? ? ? ? timer.Reset(time.Second)
? ? ? ? ? ? case data := <-ch:
? ? ? ? ? ? ? ? tasks = append(tasks, data)
? ? ? ? ? ? ? ? // Reset the timer for each data point so that we only flush
? ? ? ? ? ? ? ? // partial batches when we stop receiving data.
? ? ? ? ? ? ? ? if !timer.Stop() {
? ? ? ? ? ? ? ? ? ? <-timer.C
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? timer.Reset(time.Second)
? ? ? ? ? ? ? ? // Guard clause to for batch size
? ? ? ? ? ? ? ? if len(tasks) < 3 {
? ? ? ? ? ? ? ? ? ? fmt.Println("Queue still too small")
? ? ? ? ? ? ? ? ? ? continue
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? flush(tasks)
? ? ? ? ? ? ? ? tasks = nil // reset tasks
? ? ? ? ? ? }
? ? ? ? }
? ? }()
? ? ch <- "Msg 1"
? ? time.Sleep(time.Second)
? ? ch <- "Msg 2"
? ? time.Sleep(time.Second)
? ? ch <- "Msg 3"
? ? time.Sleep(time.Second)
? ? ch <- "Msg 4"
? ? time.Sleep(time.Second)
}
func flush(tasks []string) {
? ? // Guard against emtpy flushes
? ? if len(tasks) == 0 {
? ? ? ? return
? ? }
? ? fmt.Println("Flush")
? ? for _, t := range tasks {
? ? ? ? fmt.Println(t)
? ? }
}

TA貢獻(xiàn)1815條經(jīng)驗(yàn) 獲得超13個(gè)贊
我可以看到批處理結(jié)果的東西是如何有用的。但它確實(shí)需要定制解決方案。有很多方法可以解決這個(gè)問(wèn)題——我試過(guò)使用Sync.WaitGroup但它變得很亂。似乎使用 async.Mutex來(lái)鎖定批處理功能是最好的方法。但是,當(dāng) mutex 是最好的答案時(shí),imo 應(yīng)該觸發(fā)對(duì)設(shè)計(jì)的重新檢查,因?yàn)?imo,它應(yīng)該是最后一個(gè)選項(xiàng)。
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
func main() {
ctx, canc := context.WithCancel(context.Background())
acc := NewAccumulator(4, ctx)
go func() {
for i := 0; i < 10; i++ {
acc.Write("hi")
}
canc()
}()
read := acc.ReadChan()
for batch := range read {
fmt.Println(batch)
}
fmt.Println("done")
}
type Accumulator struct {
count int64
size int
in chan string
out chan []string
ctx context.Context
doneFlag int64
mu sync.Mutex
}
func NewAccumulator(size int, parentCtx context.Context) *Accumulator {
a := &Accumulator{
size: size,
in: make(chan string, size),
out: make(chan []string, 1),
ctx: parentCtx,
}
go func() {
<-a.ctx.Done()
atomic.AddInt64(&a.doneFlag, 1)
close(a.in)
a.mu.Lock()
a.batch()
a.mu.Unlock()
close(a.out)
}()
return a
}
func (a *Accumulator) Write(s string) {
if atomic.LoadInt64(&a.doneFlag) > 0 {
panic("write to closed accumulator")
}
a.in <- s
atomic.AddInt64(&a.count, 1)
a.mu.Lock()
if atomic.LoadInt64(&a.count) == int64(a.size) {
a.batch()
}
a.mu.Unlock()
}
func (a *Accumulator) batch() {
batch := make([]string, 0)
for i := 0; i < a.size; i++ {
msg := <-a.in
if msg != "" {
batch = append(batch, msg)
}
}
fmt.Println("batching", batch)
a.out <- batch
atomic.StoreInt64(&a.count, 0)
}
func (a *Accumulator) ReadChan() <-chan []string {
return a.out
}
最好只擁有一個(gè)累積字符串的切片,當(dāng)該切片達(dá)到一定大小時(shí),然后開始一些處理。
- 2 回答
- 0 關(guān)注
- 151 瀏覽
添加回答
舉報(bào)