我想在另一组任务结束时动态安排Celery的周期性任务.
我知道如何使用Celery创建(静态)周期性任务:
CELERYBEAT_SCHEDULE = {
'poll_actions': {
'task': 'tasks.poll_actions','schedule': timedelta(seconds=5)
}
}
但我想从我的任务中动态创建周期性作业(并且可能有一种方法可以在达到某些条件时停止这些周期性作业(所有任务都已完成).
就像是:
@celery.task
def run(ids):
group(prepare.s(id) for id in ids) | execute.s(ids) | poll.s(ids,schedule=timedelta(seconds=5))
@celery.task
def prepare(id):
...
@celery.task
def execute(id):
...
@celery.task
def poll(ids):
# This task has to be schedulable on demand
...
最佳答案
对此的直接解决方案要求您能够动态添加/删除节拍调度程序条目.截至回答这个问题……
原文链接:https://www.f2er.com/python/438668.htmlHow to dynamically add / remove periodic tasks to Celery (celerybeat)
这是不可能的.我怀疑它在过渡期间是否可用,因为……
你在这里混淆了两个概念. “事件驱动工作”的概念和“批量计划驱动工作”的概念(这实际上只是事件按计划发生的第一种情况).如果你真的考虑到你在做什么,你会发现有一套相当复杂的边缘情况.消息本质上是分布的,当从不同消息产生的组开始创建冲突条目时会发生什么?当你发现自己处于以前预定的kruft山之下时,你会怎么做?
使用消息传递系统时,您真的希望构建递归树.做某事的工作主轴并产生更多信息以做更多事情.除了这些循环(有意或无意),最终实现它们的基本情况并终止.
您实际尝试实现的任何目标的答案都在于在您的消息传递系统和异步工作框架的限制内重新编码您的问题.