3 回答

TA貢獻(xiàn)1810條經(jīng)驗(yàn) 獲得超4個(gè)贊
鎖定的全球地圖為您提供了良好的開端。您可以為每個(gè)“事務(wù)”設(shè)置一個(gè)工作器,處理程序通過通道向他們發(fā)送請(qǐng)求,使用鎖定的地圖來跟蹤通道。工作人員可以在收到特殊請(qǐng)求時(shí)關(guān)閉交易。您不希望懸空事務(wù)成為問題,因此您應(yīng)該安排在超時(shí)后發(fā)送人為關(guān)閉請(qǐng)求。
這不是唯一的方法,盡管它可能很方便。如果您只需要在其他地方處理事務(wù)時(shí)讓某些請(qǐng)求等待,那么可能有一個(gè)帶有*sync.Mutexes映射的構(gòu)造,而不是與工作程序 goroutine 通信的通道,這樣可以更好地利用資源。(現(xiàn)在在 bgp 的回答中或多或少有這種方法的代碼。)
渠道方法的一個(gè)例子如下;除了在每個(gè)事務(wù)中序列化工作之外,它還演示了如何使用close和sync.WaitGroup為這樣的設(shè)置進(jìn)行正常關(guān)閉和超時(shí)。它在操場(chǎng)上。
package main
import (
"fmt"
"log"
"sync"
"time"
)
// Req represents a request. In real use, if there are many kinds of requests, it might be or contain an interface value that can point to one of several different concrete structs.
type Req struct {
id int
payload string // just for demo
// ...
}
// Worker represents worker state.
type Worker struct {
id int
reqs chan *Req
// ...
}
var tasks = map[int]chan *Req{}
var tasksLock sync.Mutex
const TimeoutDuration = 100 * time.Millisecond // to demonstrate; in reality higher
// for graceful shutdown, you probably want to be able to wait on all workers to exit
var tasksWg sync.WaitGroup
func (w *Worker) Work() {
defer func() {
tasksLock.Lock()
delete(tasks, w.id)
if r := recover(); r != nil {
log.Println("worker panic (continuing):", r)
}
tasksLock.Unlock()
tasksWg.Done()
}()
for req := range w.reqs {
// ...do work...
fmt.Println("worker", w.id, "handling request", req)
if req.payload == "close" {
fmt.Println("worker", w.id, "quitting because of a close req")
return
}
}
fmt.Println("worker", w.id, "quitting since its channel was closed")
}
// Handle dispatches the Request to a Worker, creating one if needed.
func (r *Req) Handle() {
tasksLock.Lock()
defer tasksLock.Unlock()
id := r.id
reqs := tasks[id]
if reqs == nil {
// making a buffered channel here would let you queue up
// n tasks for a given ID before the the Handle() call
// blocks
reqs = make(chan *Req)
tasks[id] = reqs
w := &Worker{
id: id,
reqs: reqs,
}
tasksWg.Add(1)
go w.Work()
time.AfterFunc(TimeoutDuration, func() {
tasksLock.Lock()
if reqs := tasks[id]; reqs != nil {
close(reqs)
delete(tasks, id)
}
tasksLock.Unlock()
})
}
// you could close(reqs) if you get a request that means
// 'end the transaction' with no further info. I'm only
// using close for graceful shutdown, though.
reqs <- r
}
// Shutdown asks the workers to shut down and waits.
func Shutdown() {
tasksLock.Lock()
for id, w := range tasks {
close(w)
// delete so timers, etc. won't see a ghost of a task
delete(tasks, id)
}
// must unlock b/c workers can't finish shutdown
// until they can remove themselves from maps
tasksLock.Unlock()
tasksWg.Wait()
}
func main() {
fmt.Println("Hello, playground")
reqs := []*Req{
{id: 1, payload: "foo"},
{id: 2, payload: "bar"},
{id: 1, payload: "baz"},
{id: 1, payload: "close"},
// worker 2 will get closed because of timeout
}
for _, r := range reqs {
r.Handle()
}
time.Sleep(75*time.Millisecond)
r := &Req{id: 3, payload: "quux"}
r.Handle()
fmt.Println("worker 2 should get closed by timeout")
time.Sleep(75*time.Millisecond)
fmt.Println("worker 3 should get closed by shutdown")
Shutdown()
}

TA貢獻(xiàn)1839條經(jīng)驗(yàn) 獲得超15個(gè)贊
需要保持一個(gè)全局互斥鎖來鎖定對(duì)并發(fā)請(qǐng)求發(fā)生的映射的訪問,然后從那里使用互斥鎖或計(jì)數(shù)器,然后確保它沒有死鎖,然后垃圾收集(或仔細(xì)引用計(jì)數(shù))舊請(qǐng)求條目
這似乎過于復(fù)雜了。這是我將如何做到的:
所有地圖內(nèi)容都應(yīng)該由一個(gè)線程(您的調(diào)度程序)處理,因此您不必處理鎖定。這假設(shè)工作時(shí)間遠(yuǎn)大于調(diào)度時(shí)間。調(diào)度程序跟蹤每個(gè) ID 的通道和計(jì)數(shù)器(顯然在地圖中)。
唯一的復(fù)雜問題是如何處理“goroutine 認(rèn)為它已經(jīng)完成了 ID 的工作”與“調(diào)度員剛剛發(fā)現(xiàn)更多工作”的競(jìng)爭(zhēng)。答案是工作人員請(qǐng)求清理,但調(diào)度員決定清理請(qǐng)求是否可能。
所以這里是代碼的工作方式:
1) 調(diào)度進(jìn)程從單個(gè)輸入通道讀取。它獲得兩種類型的請(qǐng)求:“新工作”(來自外部)和“完成工作”(來自工作人員)。兩個(gè)請(qǐng)求都包含一個(gè) ID。
2) Dispatcher 收到“New Work”消息:通過 ID 在地圖中查找。如果您找到一個(gè)頻道 + 一個(gè)計(jì)數(shù),則將作品發(fā)送到該頻道并增加計(jì)數(shù)。(*) 如果你什么也沒找到,在地圖中創(chuàng)建一個(gè)新的通道 + 計(jì)數(shù),將工作發(fā)送到通道(也增加計(jì)數(shù)),然后在該通道上創(chuàng)建一個(gè)工作程序(go-routine)讀取。
3)worker goroutine 顯然會(huì)從通道中拉出“新工作”并完成工作。完成后,它將向 Dispatcher 發(fā)送“完成工作”請(qǐng)求。
4) 調(diào)度員收到“完成工作”消息。在地圖中查找并找到頻道 + 計(jì)數(shù)器。減少計(jì)數(shù)器。如果為零,則向工作人員發(fā)送“退出”消息,并刪除地圖中的條目。
5) 如果工作 goroutine 收到“退出”消息(而不是工作消息),它就會(huì)直接退出。(請(qǐng)注意,當(dāng)舊工人退出時(shí),可以在該 ID 上創(chuàng)建第二個(gè)工人的競(jìng)爭(zhēng)很小。但舊工人只會(huì)處理退出消息,所以沒關(guān)系。舊工人會(huì)清理自己向上,包括舊頻道。)
如果您的請(qǐng)求足夠慢,則地圖中一次將只有一個(gè)條目。另一個(gè)極端是,如果您對(duì)同一 ID 的請(qǐng)求足夠快,則該 ID 的通道將保持活動(dòng)狀態(tài)(只是計(jì)數(shù)器會(huì)上下波動(dòng))。
(*) 注意:如果您將頻道設(shè)置為 5 深,并且 6 條消息排隊(duì),調(diào)度程序?qū)⑼V埂N艺J(rèn)為在這種情況下您可以擴(kuò)大頻道深度,但我不確定。
- 3 回答
- 0 關(guān)注
- 190 瀏覽
添加回答
舉報(bào)