第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

運(yùn)行代碼的算法,每個(gè)唯一 ID 不超過一個(gè)并發(fā)線程

運(yùn)行代碼的算法,每個(gè)唯一 ID 不超過一個(gè)并發(fā)線程

Go
蝴蝶刀刀 2021-10-25 16:27:02
我有一個(gè) Go Web 應(yīng)用程序,它需要在每個(gè)唯一 ID 的一個(gè) goroutine 中執(zhí)行給定的代碼段。這種情況是我的請(qǐng)求帶有代表某種事務(wù)的各種 ID。需要保證對(duì)這些操作的某個(gè)子集僅針對(duì)給定 ID 運(yùn)行“一次一個(gè)”(并且其他競(jìng)爭(zhēng)請(qǐng)求應(yīng)阻塞,直到前一個(gè)處理/針對(duì)該 ID 的操作完成)。我可以想到幾種方法來做到這一點(diǎn),但簿記似乎很棘手 - 需要保持一個(gè)全局互斥鎖來鎖定對(duì)正在發(fā)生的并發(fā)請(qǐng)求的映射的訪問,然后從那里使用互斥鎖或計(jì)數(shù)器,然后確保它不會(huì)死鎖,然后垃圾收集(或仔細(xì)引用計(jì)數(shù))舊請(qǐng)求條目。我可以這樣做,但聽起來容易出錯(cuò)。在這種情況下,標(biāo)準(zhǔn)庫中是否有可以輕松使用的模式或其他東西?沒有看到任何明顯的東西。編輯:我認(rèn)為在上面的解釋中令人困惑的一件事是“交易”一詞的使用。在我的情況下,這些中的每一個(gè)都不需要顯式關(guān)閉 - 它只是一個(gè)與多個(gè)操作相關(guān)聯(lián)的標(biāo)識(shí)符。由于我沒有明確的“關(guān)閉”或“結(jié)束”概念,我可能會(huì)在同一秒內(nèi)收到 3 個(gè)請(qǐng)求,每個(gè)操作需要 2 秒 - 我需要序列化它們,因?yàn)橥瑫r(shí)運(yùn)行它們會(huì)造成嚴(yán)重破壞;但是一周后我可能會(huì)收到一個(gè)具有相同 ID 的請(qǐng)求,并且它會(huì)引用相同的操作集(ID 只是數(shù)據(jù)庫表上的 PK)。
查看完整描述

3 回答

?
蝴蝶不菲

TA貢獻(xiàn)1810條經(jīng)驗(yàn) 獲得超4個(gè)贊

鎖定的全球地圖為您提供了良好的開端。您可以為每個(gè)“事務(wù)”設(shè)置一個(gè)工作器,處理程序通過通道向他們發(fā)送請(qǐng)求,使用鎖定的地圖來跟蹤通道。工作人員可以在收到特殊請(qǐng)求時(shí)關(guān)閉交易。您不希望懸空事務(wù)成為問題,因此您應(yīng)該安排在超時(shí)后發(fā)送人為關(guān)閉請(qǐng)求。


這不是唯一的方法,盡管它可能很方便。如果您只需要在其他地方處理事務(wù)時(shí)讓某些請(qǐng)求等待,那么可能有一個(gè)帶有*sync.Mutexes映射的構(gòu)造,而不是與工作程序 goroutine 通信的通道,這樣可以更好地利用資源。(現(xiàn)在在 bgp 的回答中或多或少有這種方法的代碼。)


渠道方法的一個(gè)例子如下;除了在每個(gè)事務(wù)中序列化工作之外,它還演示了如何使用close和sync.WaitGroup為這樣的設(shè)置進(jìn)行正常關(guān)閉和超時(shí)。它在操場(chǎng)上。


package main


import (

    "fmt"

    "log"

    "sync"

    "time"

)


// Req represents a request. In real use, if there are many kinds of requests, it might be or contain an interface value that can point to one of several different concrete structs.

type Req struct {

    id      int

    payload string // just for demo

    // ...

}


// Worker represents worker state.

type Worker struct {

    id   int

    reqs chan *Req

    // ...

}


var tasks = map[int]chan *Req{}

var tasksLock sync.Mutex


const TimeoutDuration = 100 * time.Millisecond // to demonstrate; in reality higher


// for graceful shutdown, you probably want to be able to wait on all workers to exit

var tasksWg sync.WaitGroup


func (w *Worker) Work() {

    defer func() {

        tasksLock.Lock()

        delete(tasks, w.id)

        if r := recover(); r != nil {

            log.Println("worker panic (continuing):", r)

        }

        tasksLock.Unlock()

        tasksWg.Done()

    }()

    for req := range w.reqs {

        // ...do work...

        fmt.Println("worker", w.id, "handling request", req)

        if req.payload == "close" {

            fmt.Println("worker", w.id, "quitting because of a close req")

            return

        }

    }

    fmt.Println("worker", w.id, "quitting since its channel was closed")

}


// Handle dispatches the Request to a Worker, creating one if needed.

func (r *Req) Handle() {

    tasksLock.Lock()

    defer tasksLock.Unlock()

    id := r.id

    reqs := tasks[id]

    if reqs == nil {

        // making a buffered channel here would let you queue up

        // n tasks for a given ID before the the Handle() call

        // blocks

        reqs = make(chan *Req)

        tasks[id] = reqs

        w := &Worker{

            id:   id,

            reqs: reqs,

        }

        tasksWg.Add(1)

        go w.Work()

        time.AfterFunc(TimeoutDuration, func() {

            tasksLock.Lock()

            if reqs := tasks[id]; reqs != nil {

                close(reqs)

                delete(tasks, id)

            }

            tasksLock.Unlock()

        })

    }

    // you could close(reqs) if you get a request that means

    // 'end the transaction' with no further info. I'm only

    // using close for graceful shutdown, though.

    reqs <- r

}


// Shutdown asks the workers to shut down and waits.

func Shutdown() {

    tasksLock.Lock()

    for id, w := range tasks {

        close(w)

        // delete so timers, etc. won't see a ghost of a task

        delete(tasks, id)

    }

    // must unlock b/c workers can't finish shutdown

    // until they can remove themselves from maps

    tasksLock.Unlock()

    tasksWg.Wait()

}


func main() {

    fmt.Println("Hello, playground")

    reqs := []*Req{

        {id: 1, payload: "foo"},

        {id: 2, payload: "bar"},

        {id: 1, payload: "baz"},

        {id: 1, payload: "close"},

        // worker 2 will get closed because of timeout

    }

    for _, r := range reqs {

        r.Handle()

    }

    time.Sleep(75*time.Millisecond)

    r := &Req{id: 3, payload: "quux"}

    r.Handle()

    fmt.Println("worker 2 should get closed by timeout")

    time.Sleep(75*time.Millisecond)

    fmt.Println("worker 3 should get closed by shutdown")

    Shutdown()

}



查看完整回答
反對(duì) 回復(fù) 2021-10-25
?
紫衣仙女

TA貢獻(xiàn)1839條經(jīng)驗(yàn) 獲得超15個(gè)贊

需要保持一個(gè)全局互斥鎖來鎖定對(duì)并發(fā)請(qǐng)求發(fā)生的映射的訪問,然后從那里使用互斥鎖或計(jì)數(shù)器,然后確保它沒有死鎖,然后垃圾收集(或仔細(xì)引用計(jì)數(shù))舊請(qǐng)求條目

這似乎過于復(fù)雜了。這是我將如何做到的:

  • 所有地圖內(nèi)容都應(yīng)該由一個(gè)線程(您的調(diào)度程序)處理,因此您不必處理鎖定。這假設(shè)工作時(shí)間遠(yuǎn)大于調(diào)度時(shí)間。調(diào)度程序跟蹤每個(gè) ID 的通道和計(jì)數(shù)器(顯然在地圖中)。

  • 唯一的復(fù)雜問題是如何處理“goroutine 認(rèn)為它已經(jīng)完成了 ID 的工作”“調(diào)度員剛剛發(fā)現(xiàn)更多工作”的競(jìng)爭(zhēng)。答案是工作人員請(qǐng)求清理,但調(diào)度員決定清理請(qǐng)求是否可能。

所以這里是代碼的工作方式:

1) 調(diào)度進(jìn)程從單個(gè)輸入通道讀取。它獲得兩種類型的請(qǐng)求:“新工作”(來自外部)和“完成工作”(來自工作人員)。兩個(gè)請(qǐng)求都包含一個(gè) ID。

2) Dispatcher 收到“New Work”消息:通過 ID 在地圖中查找。如果您找到一個(gè)頻道 + 一個(gè)計(jì)數(shù),則將作品發(fā)送到該頻道并增加計(jì)數(shù)。(*) 如果你什么也沒找到,在地圖中創(chuàng)建一個(gè)新的通道 + 計(jì)數(shù),將工作發(fā)送到通道(也增加計(jì)數(shù)),然后在該通道上創(chuàng)建一個(gè)工作程序(go-routine)讀取。

3)worker goroutine 顯然會(huì)從通道中拉出“新工作”并完成工作。完成后,它將向 Dispatcher 發(fā)送“完成工作”請(qǐng)求。

4) 調(diào)度員收到“完成工作”消息。在地圖中查找并找到頻道 + 計(jì)數(shù)器。減少計(jì)數(shù)器。如果為零,則向工作人員發(fā)送“退出”消息,并刪除地圖中的條目。

5) 如果工作 goroutine 收到“退出”消息(而不是工作消息),它就會(huì)直接退出。(請(qǐng)注意,當(dāng)舊工人退出時(shí),可以在該 ID 上創(chuàng)建第二個(gè)工人的競(jìng)爭(zhēng)很小。但舊工人只會(huì)處理退出消息,所以沒關(guān)系。舊工人會(huì)清理自己向上,包括舊頻道。)

如果您的請(qǐng)求足夠慢,則地圖中一次將只有一個(gè)條目。另一個(gè)極端是,如果您對(duì)同一 ID 的請(qǐng)求足夠快,則該 ID 的通道將保持活動(dòng)狀態(tài)(只是計(jì)數(shù)器會(huì)上下波動(dòng))。

(*) 注意:如果您將頻道設(shè)置為 5 深,并且 6 條消息排隊(duì),調(diào)度程序?qū)⑼V埂N艺J(rèn)為在這種情況下您可以擴(kuò)大頻道深度,但我不確定。


查看完整回答
反對(duì) 回復(fù) 2021-10-25
  • 3 回答
  • 0 關(guān)注
  • 190 瀏覽

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)