3 回答

TA貢獻(xiàn)1951條經(jīng)驗(yàn) 獲得超3個(gè)贊
這是我的評(píng)論的后續(xù)內(nèi)容,在您添加示例解決方案后添加。為了比我在注釋中更清晰,您的示例代碼實(shí)際上并沒有那么糟糕。以下是您的原始示例:
// Busywait solution
for {
select {
case <-time.After(to):
if cbOneDone && cbTwoDone {
fmt.Println("Both CB executed (we could poll more often)")
return nil
}
fmt.Println("Timeout!")
return fmt.Errorf("Timeout")
case <-cbOneDoneCh:
cbOneDone = true
case <-cbTwoDoneCh:
cbTwoDone = true
}
}
這不是一個(gè)“繁忙的等待”,但它確實(shí)有幾個(gè)錯(cuò)誤(包括你需要一個(gè)只為已完成的通道發(fā)送語義的事實(shí),或者可能更容易,至少同樣好,在完成時(shí)只關(guān)閉它們一次,也許使用)。我們想做的是:sync.Once
啟動(dòng)計(jì)時(shí)器 作為超時(shí)。
to
使用計(jì)時(shí)器的通道和兩個(gè)“完成”通道輸入選擇循環(huán)。
我們希望在發(fā)生以下第一個(gè)事件時(shí)退出 select 循環(huán):
計(jì)時(shí)器觸發(fā),或
雙“完成”通道已發(fā)出信號(hào)。
如果我們要轉(zhuǎn)到兩個(gè)已完成的通道,我們也希望清除變量(設(shè)置為 ),以便選擇不會(huì)旋轉(zhuǎn) - 這將變成真正的忙碌等待 - 但目前讓我們假設(shè)我們?cè)诨卣{(diào)時(shí)只發(fā)送一次,否則只會(huì)泄漏通道, 以便我們可以按照編寫的方式使用您的代碼,因?yàn)檫@些選擇只會(huì)返回一次。以下是更新后的代碼:close
Ch
nil
t := timer.NewTimer(to)
for !cbOneDone || !cbTwoDone {
select {
case <-t.C:
fmt.Println("Timeout!")
return fmt.Errorf("timeout")
}
case <-cbOneDoneCh:
cbOneDone = true
case <-cbTwoDoneCh:
cbTwoDone = true
}
}
// insert t.Stop() and receive here to drain t.C if desired
fmt.Println("Both CB executed")
return nil
請(qǐng)注意,我們最多將經(jīng)歷兩次循環(huán):
如果我們從兩個(gè) Done 通道接收到每個(gè)通道,則循環(huán)停止而不會(huì)超時(shí)。沒有旋轉(zhuǎn)/忙碌等待:我們從未收到過任何東西。我們返回零(無錯(cuò)誤)。
t.C
如果我們從一個(gè) Done 通道接收,循環(huán)將恢復(fù),但會(huì)阻塞等待計(jì)時(shí)器或另一個(gè) Done 通道。
如果我們從 接收到 ,則表示我們尚未收到兩個(gè)回調(diào)。我們可能有一個(gè),但有一個(gè)暫停,我們選擇放棄,這是我們的目標(biāo)。我們返回一個(gè)錯(cuò)誤,而不通過循環(huán)返回。
t.C
一個(gè)真正的版本需要更多的工作來正確清理并避免泄漏“完成”通道(以及計(jì)時(shí)器通道及其goroutine;參見評(píng)論),但這是一般的想法。您已經(jīng)將回調(diào)轉(zhuǎn)換為通道操作,并且已經(jīng)具有其通道的計(jì)時(shí)器。

TA貢獻(xiàn)1934條經(jīng)驗(yàn) 獲得超2個(gè)贊
下面的代碼有兩個(gè)變體,
第一個(gè)是常規(guī)模式,沒有什么花哨的,它做了工作,做得很好。您將回調(diào)啟動(dòng)到例程中,使它們推送到接收器,收聽該接收器以獲取結(jié)果或超時(shí)。注意接收器通道的初始容量,為了防止泄漏例程,它必須與回調(diào)次數(shù)匹配。
第二個(gè)工廠將同步機(jī)制分解成小函數(shù)進(jìn)行組裝,提供兩種等待方法,waitAll 和 waitOne。寫起來不錯(cuò),但效率肯定更低,分配更多,渠道更多來回,推理更復(fù)雜,更微妙。
package main
import (
"fmt"
"log"
"sync"
"time"
)
func main() {
ExampleOne()
ExampleTwo()
ExampleThree()
fmt.Println("Hello, playground")
}
func ExampleOne() {
log.Println("start reg")
errs := make(chan error, 2)
go func() {
fn := callbackWithOpts("reg: so slow", 2*time.Second, nil)
errs <- fn()
}()
go func() {
fn := callbackWithOpts("reg: too fast", time.Millisecond, fmt.Errorf("broke!"))
errs <- fn()
}()
select {
case err := <-errs: // capture only one result,
// the fastest to finish.
if err != nil {
log.Println(err)
}
case <-time.After(time.Second): // or wait that many amount of time,
// in case they are all so slow.
}
log.Println("done reg")
}
func ExampleTwo() {
log.Println("start wait")
errs := waitAll(
withTimeout(time.Second,
callbackWithOpts("waitAll: so slow", 2*time.Second, nil),
),
withTimeout(time.Second,
callbackWithOpts("waitAll: too fast", time.Millisecond, nil),
),
)
for err := range trim(errs) {
if err != nil {
log.Println(err)
}
}
log.Println("done wait")
}
func ExampleThree() {
log.Println("start waitOne")
errs := waitOne(
withTimeout(time.Second,
callbackWithOpts("waitOne: so slow", 2*time.Second, nil),
),
withTimeout(time.Second,
callbackWithOpts("waitOne: too fast", time.Millisecond, nil),
),
)
for err := range trim(errs) {
if err != nil {
log.Println(err)
}
}
log.Println("done waitOne")
}
// a configurable callback for playing
func callbackWithOpts(msg string, tout time.Duration, err error) func() error {
return func() error {
<-time.After(tout)
fmt.Println(msg)
return err
}
}
// withTimeout return a function that returns first error or times out and return nil
func withTimeout(tout time.Duration, h func() error) func() error {
return func() error {
d := make(chan error, 1)
go func() {
d <- h()
}()
select {
case err := <-d:
return err
case <-time.After(tout):
}
return nil
}
}
// wait launches all func() and return their errors into the returned error channel; (merge)
// It is the caller responsability to drain the output error channel.
func waitAll(h ...func() error) chan error {
d := make(chan error, len(h))
var wg sync.WaitGroup
for i := 0; i < len(h); i++ {
wg.Add(1)
go func(h func() error) {
defer wg.Done()
d <- h()
}(h[i])
}
go func() {
wg.Wait()
close(d)
}()
return d
}
// wait launches all func() and return the first error into the returned error channel
// It is the caller responsability to drain the output error channel.
func waitOne(h ...func() error) chan error {
d := make(chan error, len(h))
one := make(chan error, 1)
var wg sync.WaitGroup
for i := 0; i < len(h); i++ {
wg.Add(1)
go func(h func() error) {
defer wg.Done()
d <- h()
}(h[i])
}
go func() {
for err := range d {
one <- err
close(one)
break
}
}()
go func() {
wg.Wait()
close(d)
}()
return one
}
func trim(err chan error) chan error {
out := make(chan error)
go func() {
for e := range err {
out <- e
}
close(out)
}()
return out
}

TA貢獻(xiàn)1864條經(jīng)驗(yàn) 獲得超2個(gè)贊
func wait(ctx context.Context, wg *sync.WaitGroup) error {
done := make(chan struct{}, 1)
go func() {
wg.Wait()
done <- struct{}{}
}()
select {
case <-done:
// Counter is 0, so all callbacks completed.
return nil
case <-ctx.Done():
// Context cancelled.
return ctx.Err()
}
}
或者,您可以傳遞 a 和 塊而不是 on ,但我認(rèn)為使用上下文更習(xí)慣用語。time.Duration<-time.After(d)<-ctx.Done()
- 3 回答
- 0 關(guān)注
- 107 瀏覽
添加回答
舉報(bào)