我對芹菜有新手的經(jīng)驗。我寫過很多任務,既有預定的也有延遲的,但僅此而已。我遇到了一個問題,我想創(chuàng)建一個任務來啟動 1000 個較小的作業(yè),以減輕隊列長度和可能需要數(shù)小時才能完成的作業(yè)可能出現(xiàn)的任何問題。當前應用程序依賴于來自外部 API 的信息??梢赃@么說,用戶將他們的帳戶與我集成的另一項服務相關(guān)聯(lián),我想每天根據(jù)他們的外部帳戶的變化來更新用戶的信息。我有這樣的預定工作@app.task() def refresh_accounts(): for account in Account.objects.all(): response = retrieve_account_info(account_id=account.id) account.data = response.data account.save() --我想要的是這樣的@app.task()def kickoff_refresh(): for account in Account.objects.all() refresh_account.delay(account_id=account.id)@app.task() def refresh_account(account_id=None): account = Account.objects.get(id=account_id) response = retrieve_account_info(account_id=account.id) account.data = response.data account.save()我想到的一種方法是kickoff_refresh在refresh_account不同的隊列中。@app.task(queue=q1), @app.task(queue=q2)... 但是,我不知道是否有更好的方法。在同一隊列的任務中調(diào)用任務在 celery 中似乎是不好的做法 - https://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks任務kickoff_refresh將是每隔幾個小時運行一次周期性任務。我很想聽聽什么對其他人有用。謝謝
1 回答

幕布斯7119047
TA貢獻1794條經(jīng)驗 獲得超8個贊
from celery import group
@app.task()
def kickoff_refresh(account_id=None):
job = group(refresh_account.s(account_id=account.id) for account in Account.objects.all())()
@app.task()
def refresh_account(account_id=None):
account = Account.objects.get(id=account_id)
response = retrieve_account_info(account_id=account.id)
account.data = response.data
account.save()
添加回答
舉報
0/150
提交
取消