问题:
当将1000个任务发送到apply_async时,它们在所有48个cpu上并行运行,但有时运行的cpu越来越少,直到只剩下一个cpu运行,并且只有当最后一个cpu完成其任务时,所有cpu才会继续运行每个人都有一个新任务.它不应该等待像这样的任何“任务批处理”..
我的(简化)代码:
from multiprocessing import Pool
pool = Pool(47)
tasks = [pool.apply_async(json2features,(j,)) for j in jsons]
feats = [t.get() for t in tasks]
jsons = […]是已加载到内存并解析为对象的大约1000个JSON的列表.
json2features(json)在json上执行一些cpu繁重的工作,并返回一个数字数组.
此功能可能需要1秒到15分钟才能运行,因此我使用启发式方法对jsons进行排序.希望最长的任务首先在列表中,因此首先开始.
json2features函数还会在任务完成时以及花费的时间内打印.它全部运行在一个拥有48个核心的ubuntu服务器上,就像我上面所说的那样,使用全部47个核心,它开始很棒.然后,当任务完成时,运行的核心越来越少,这听起来完全没问题,不是因为在最后一个核心完成之后(当我看到它打印到stdout时),所有cpu都开始在新任务上再次运行,这意味着这不是真正的清单结束.它可能会再次执行相同的操作,然后再次执行列表的实际结束.
有时它可以在5分钟内只使用一个核心,当任务最终完成时,它会在新任务上再次开始使用所有核心. (所以它不会停留在某些IPC开销上)
没有重复的jsons,也没有任何依赖关系(它们都是静态的,新鲜的磁盘数据,没有引用等等),也没有json2features调用之间的任何依赖关系(没有全局状态或任何东西),除了它们使用相同的终端他们的印刷品.
我怀疑问题是工作人员在调用get结果之前不会被释放,所以我尝试了以下代码:
from multiprocessing import Pool
pool = Pool(47)
tasks = [pool.apply_async(print,(i,)) for i in range(1000)]
# feats = [t.get() for t in tasks]
并且它会打印所有1000个数字,即使没有调用get.
我现在已经没想到问题可能是什么了.
这真的是Pool的正常行为吗?
非常感谢!
通常在Unix上,默认管道大小范围为4到64 Kb.如果您提供的JSON大小很大,您可能会在任何给定的时间点堵塞管道.
这意味着,当其中一名工人忙于从管道中读取大型JSON时,所有其他工作人员都会饿死.
通过IPC共享大数据通常是一种不好的做法,因为它会导致性能不佳.这在multiprocessing programming guidelines中甚至得到了强调.
Avoid shared state
As far as possible one should try to avoid shifting large amounts of data between processes.
不要在主进程中读取JSON文件,只需向工作人员发送文件名,然后让他们打开并阅读内容.您肯定会注意到性能的提高,因为您也在并发域中移动JSON加载阶段.
请注意,结果也是如此.单个os.pipe也用于将结果返回到主进程.如果一个或多个工作人员阻塞了结果管道,那么您将获得等待主管道排除它的所有进程.应将大结果写入文件.然后,您可以利用主进程上的多线程快速回读文件中的结果.