我正在处理的应用程序非常异步. Web应用程序根据用户操作通过芹菜运行许多任务.芹菜任务本身能够启动进一步的任务.
def do_sth(): logic(); if condition: function1.apply_async(*args) else: function2.apply_asynch(*args)
现在我们要开始对我们编写的任何新代码进行单元测试,但我们不确定如何执行此操作.我们想在pytest单元测试中断言的是我们想看看function1是否真的被调用了.我们不希望必须运行function1本身,因为我们将对function1进行单元测试.
我不希望将celery作为进程运行,也不想在单元测试会话期间运行任何AMQP代理.
这可以实现吗?
编辑
有人指出,这是How do you unit test a Celery task?的重复
它不是.想一想.我要问的是如何测试函数是否通过apply_async调用了function1.那个问题是我如何测试function1本身.有一个主要的区别.在构建这个问题之前,我确实遇到了这个问题.