芹菜任務(wù)被“接收”意味著什么?當(dāng)所有 celery 工作線程都被阻塞時(shí),未“收到”的新任務(wù)會發(fā)生什么
拉莫斯之舞
2023-12-09 15:41:12
我正在開發(fā)一個(gè)新的監(jiān)控系統(tǒng),該系統(tǒng)可以測量 Celery 隊(duì)列吞吐量,并在隊(duì)列備份時(shí)幫助向團(tuán)隊(duì)發(fā)出警報(bào)。在我的工作過程中,我遇到了一些我不理解的奇怪行為(并且在 Celery 規(guī)范中沒有詳細(xì)記錄)。出于測試目的,我設(shè)置了一個(gè)端點(diǎn),該端點(diǎn)將使用 16 個(gè)可用于模擬備份隊(duì)列的長時(shí)間運(yùn)行的任務(wù)填充隊(duì)列。框架是Flask,隊(duì)列代理是Redis。Celery 配置為每個(gè)工作人員可以并行處理最多 4 個(gè)任務(wù),而我有 2 個(gè)工作人員正在運(yùn)行。api/health.pydef health(): health = Blueprint("health", __name__) @health.route("/api/debug/create-long-queue", methods=["GET"]) def long_queue(): for i in range(16): sleepy_job.delay() return make_response({}, 200) return healthjobs.py@celery.task(priority=HIGH_PRIORITY)def sleepy_job(*args, **kwargs): time.sleep(30)以下是我模擬備份生產(chǎn)隊(duì)列的方法:我打電話/api/debug/create-long-queue來模擬隊(duì)列中的備份。根據(jù)上面的計(jì)算,每個(gè)工人應(yīng)該忙著睡覺 1 分鐘(總共可以同時(shí)處理 8 個(gè)任務(wù)。每個(gè)任務(wù)只休眠 30 秒,總共 16 個(gè)任務(wù)。)不久后(< 5 秒)我進(jìn)行了另一個(gè) API 調(diào)用,這會啟動具有實(shí)際業(yè)務(wù)邏輯的不同工作(處理入站 Webhook API 調(diào)用)。我們稱這個(gè)工作為handle_incoming_message。這是我看到的使用花來檢查隊(duì)列的內(nèi)容:雖然所有工作線程都被前 8 個(gè)任務(wù)阻塞,但我在隊(duì)列中看 sleepy_job不到新任務(wù)的跡象 ,盡管我確信第二個(gè) API 調(diào)用已被調(diào)用。handle_incoming_messagehandle_incoming_message.delay()前 8 個(gè)sleepy_job任務(wù)完成后(約 30 秒),我 handle_incoming_message在隊(duì)列中看到新任務(wù),狀態(tài)為RECIEVED。在第二個(gè)(也是最后一個(gè))8 個(gè)sleepy_job任務(wù)完成后,我現(xiàn)在看到handle_incoming_message了狀態(tài)STARTED(當(dāng) UI 使用該任務(wù)中接收和處理的新數(shù)據(jù)進(jìn)行更新時(shí),我可以確認(rèn)這一點(diǎn)。)問題因此,很明顯,當(dāng)工作人員在處理完前 8 個(gè)sleepy_job任務(wù)后暫時(shí)解除阻塞時(shí),他們正在以一種可見的方式做一些事情來標(biāo)記/確認(rèn)新任務(wù)。但這留下了幾個(gè)懸而未決的問題:handle_incoming_messagehandle_incoming_message當(dāng)worker被阻塞時(shí),新任務(wù)的狀態(tài)如何?工作人員解除封鎖后發(fā)生了哪些變化,使得它現(xiàn)在可以了解新handle_incoming_message任務(wù)?“已接收”狀態(tài)實(shí)際上意味著什么?(獎(jiǎng)勵(lì):如何在工作人員被阻塞時(shí)了解排隊(duì)的任務(wù)?)
查看完整描述