3 回答

TA貢獻1821條經(jīng)驗 獲得超6個贊
這個問題在google網(wǎng)上論壇上得到了回答。
我不是作者,所有功勞歸Jean Jean
這是一個適當?shù)慕鉀Q方案。確認可以正常工作,在我的場景中,我將“定期任務(wù)”子類化,并根據(jù)它創(chuàng)建了一個模型,因為我可以根據(jù)需要向模型中添加其他字段,也可以添加“終止”方法。您必須將定期任務(wù)的enabled屬性設(shè)置為False并保存,然后再將其刪除。整個子類不是必須的,schedule_every方法是真正起作用的方法。當您準備終止任務(wù)時(如果您沒有將其子類化),您可以使用PeriodicTask.objects.filter(name = ...)搜索任務(wù),將其禁用,然后將其刪除。
希望這可以幫助!
from djcelery.models import PeriodicTask, IntervalSchedule
from datetime import datetime
class TaskScheduler(models.Model):
periodic_task = models.ForeignKey(PeriodicTask)
@staticmethod
def schedule_every(task_name, period, every, args=None, kwargs=None):
""" schedules a task by name every "every" "period". So an example call would be:
TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3])
that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task.
"""
permissible_periods = ['days', 'hours', 'minutes', 'seconds']
if period not in permissible_periods:
raise Exception('Invalid period specified')
# create the periodic task and the interval
ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
if interval_schedules: # just check if interval schedules exist like that already and reuse em
interval_schedule = interval_schedules[0]
else: # create a brand new interval schedule
interval_schedule = IntervalSchedule()
interval_schedule.every = every # should check to make sure this is a positive int
interval_schedule.period = period
interval_schedule.save()
ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
if args:
ptask.args = args
if kwargs:
ptask.kwargs = kwargs
ptask.save()
return TaskScheduler.objects.create(periodic_task=ptask)
def stop(self):
"""pauses the task"""
ptask = self.periodic_task
ptask.enabled = False
ptask.save()
def start(self):
"""starts the task"""
ptask = self.periodic_task
ptask.enabled = True
ptask.save()
def terminate(self):
self.stop()
ptask = self.periodic_task
self.delete()
ptask.delete()

TA貢獻1789條經(jīng)驗 獲得超10個贊
有一個名為django-celery-beat的庫,它提供了所需的模型。為了使其能夠動態(tài)加載新的定期任務(wù),必須創(chuàng)建自己的Scheduler。
from django_celery_beat.schedulers import DatabaseScheduler
class AutoUpdateScheduler(DatabaseScheduler):
def tick(self, *args, **kwargs):
if self.schedule_changed():
print('resetting heap')
self.sync()
self._heap = None
new_schedule = self.all_as_schedule()
if new_schedule:
to_add = new_schedule.keys() - self.schedule.keys()
to_remove = self.schedule.keys() - new_schedule.keys()
for key in to_add:
self.schedule[key] = new_schedule[key]
for key in to_remove:
del self.schedule[key]
super(AutoUpdateScheduler, self).tick(*args, **kwargs)
@property
def schedule(self):
if not self._initial_read and not self._schedule:
self._initial_read = True
self._schedule = self.all_as_schedule()
return self._s
添加回答
舉報