第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

在順序執(zhí)行之前等待通道中的 N 個(gè)項(xiàng)目

在順序執(zhí)行之前等待通道中的 N 個(gè)項(xiàng)目

Go
慕哥9229398 2023-05-08 15:49:57
所以我很新去!但是我對(duì)我想嘗試的事情有這個(gè)想法。我想要一個(gè)從通道接受字符串的 go 例程,但只有在它收到 N 個(gè)字符串后才應(yīng)該對(duì)它們執(zhí)行。我四處尋找類似的問(wèn)題或案例,但我只發(fā)現(xiàn)了一些想法是并行執(zhí)行多個(gè)例程并等待匯總結(jié)果。我想到了創(chuàng)建一個(gè)數(shù)組并將其傳遞給長(zhǎng)度足夠的例程的想法。但是我想保持一定的關(guān)注點(diǎn)分離并在接收端控制它。我的問(wèn)題是。這是出于某種原因的不良做法嗎?有沒(méi)有更好的方法來(lái)做到這一點(diǎn),它是什么?func main() {    ch := make(chan string)    go func() {        tasks := []string{}        for {            tasks = append(tasks,<- ch)            if len(tasks) < 3 {                fmt.Println("Queue still to small")            }            if len(tasks) > 3 {                for i := 0; i < len(tasks); i++ {                    fmt.Println(tasks[i])                }            }        }    }()    ch <- "Msg 1"    time.Sleep(time.Second)    ch <- "Msg 2"    time.Sleep(time.Second)    ch <- "Msg 3"    time.Sleep(time.Second)    ch <- "Msg 4"    time.Sleep(time.Second)}編輯更簡(jiǎn)單更準(zhǔn)確的例子。
查看完整描述

2 回答

?
慕娘9325324

TA貢獻(xiàn)1783條經(jīng)驗(yàn) 獲得超4個(gè)贊

根據(jù)一些評(píng)論,您正在尋找的似乎是某種形式的批處理。

批處理有幾種情況,當(dāng)您想要獲取批處理并將其一起發(fā)送時(shí):

  1. 批量大小足夠

  2. 已經(jīng)過(guò)了足夠的時(shí)間,應(yīng)該沖洗部分批次

您給出的示例不考慮第二種情況。如果您只是因?yàn)橥V辜虞d而從不沖水,這可能會(huì)導(dǎo)致一些尷尬的行為。

因此,我建議要么查看庫(kù)(例如,cloudfoundry/go-batching),要么簡(jiǎn)單地使用通道、計(jì)時(shí)器和選擇語(yǔ)句。

package main


import (

? ? "fmt"

? ? "time"

)


func main() {

? ? ch := make(chan string)

? ? go func() {

? ? ? ? tasks := []string{}

? ? ? ? timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience

? ? ? ? for {

? ? ? ? ? ? select {

? ? ? ? ? ? case <-timer.C:

? ? ? ? ? ? ? ? fmt.Println("Flush partial batch due to time")

? ? ? ? ? ? ? ? flush(tasks)

? ? ? ? ? ? ? ? tasks = nil

? ? ? ? ? ? ? ? timer.Reset(time.Second)

? ? ? ? ? ? case data := <-ch:

? ? ? ? ? ? ? ? tasks = append(tasks, data)


? ? ? ? ? ? ? ? // Reset the timer for each data point so that we only flush

? ? ? ? ? ? ? ? // partial batches when we stop receiving data.

? ? ? ? ? ? ? ? if !timer.Stop() {

? ? ? ? ? ? ? ? ? ? <-timer.C

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? timer.Reset(time.Second)


? ? ? ? ? ? ? ? // Guard clause to for batch size

? ? ? ? ? ? ? ? if len(tasks) < 3 {

? ? ? ? ? ? ? ? ? ? fmt.Println("Queue still too small")

? ? ? ? ? ? ? ? ? ? continue

? ? ? ? ? ? ? ? }


? ? ? ? ? ? ? ? flush(tasks)

? ? ? ? ? ? ? ? tasks = nil // reset tasks

? ? ? ? ? ? }

? ? ? ? }

? ? }()


? ? ch <- "Msg 1"

? ? time.Sleep(time.Second)

? ? ch <- "Msg 2"

? ? time.Sleep(time.Second)

? ? ch <- "Msg 3"

? ? time.Sleep(time.Second)

? ? ch <- "Msg 4"

? ? time.Sleep(time.Second)

}


func flush(tasks []string) {

? ? // Guard against emtpy flushes

? ? if len(tasks) == 0 {

? ? ? ? return

? ? }


? ? fmt.Println("Flush")

? ? for _, t := range tasks {

? ? ? ? fmt.Println(t)

? ? }

}


查看完整回答
反對(duì) 回復(fù) 2023-05-08
?
蕭十郎

TA貢獻(xiàn)1815條經(jīng)驗(yàn) 獲得超13個(gè)贊

我可以看到批處理結(jié)果的東西是如何有用的。但它確實(shí)需要定制解決方案。有很多方法可以解決這個(gè)問(wèn)題——我試過(guò)使用Sync.WaitGroup但它變得很亂。似乎使用 async.Mutex來(lái)鎖定批處理功能是最好的方法。但是,當(dāng) mutex 是最好的答案時(shí),imo 應(yīng)該觸發(fā)對(duì)設(shè)計(jì)的重新檢查,因?yàn)?imo,它應(yīng)該是最后一個(gè)選項(xiàng)。


package main


import (

    "context"

    "fmt"

    "sync"

    "sync/atomic"

)


func main() {


    ctx, canc := context.WithCancel(context.Background())

    acc := NewAccumulator(4, ctx)

    go func() {

        for i := 0; i < 10; i++ {

            acc.Write("hi")

        }

        canc()

    }()


    read := acc.ReadChan()

    for batch := range read {

        fmt.Println(batch)

    }

    fmt.Println("done")

}


type Accumulator struct {

    count    int64

    size     int

    in       chan string

    out      chan []string

    ctx      context.Context

    doneFlag int64

    mu   sync.Mutex

}


func NewAccumulator(size int, parentCtx context.Context) *Accumulator {

    a := &Accumulator{

        size: size,

        in:   make(chan string, size),

        out:  make(chan []string, 1),

        ctx:  parentCtx,

    }


    go func() {

        <-a.ctx.Done()

        atomic.AddInt64(&a.doneFlag, 1)

        close(a.in)

        a.mu.Lock()

        a.batch()

        a.mu.Unlock()

        close(a.out)

    }()

    return a

}


func (a *Accumulator) Write(s string) {

    if atomic.LoadInt64(&a.doneFlag) > 0 {

        panic("write to closed accumulator")

    }

    a.in <- s

    atomic.AddInt64(&a.count, 1)

    a.mu.Lock()

    if atomic.LoadInt64(&a.count) == int64(a.size) {

        a.batch()

    }

    a.mu.Unlock()

}


func (a *Accumulator) batch() {

    batch := make([]string, 0)

    for i := 0; i < a.size; i++ {

        msg := <-a.in

        if msg != "" {

            batch = append(batch, msg)

        }

    }

    fmt.Println("batching", batch)

    a.out <- batch

    atomic.StoreInt64(&a.count, 0)

}


func (a *Accumulator) ReadChan() <-chan []string {

    return a.out

}

最好只擁有一個(gè)累積字符串的切片,當(dāng)該切片達(dá)到一定大小時(shí),然后開始一些處理。


查看完整回答
反對(duì) 回復(fù) 2023-05-08
  • 2 回答
  • 0 關(guān)注
  • 151 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)