我正在使用来自RabbitMQ频道的消息,我希望我一次可以消耗n个元素.我想我可以使用ProcessPoolExecutor(或ThreadPoolExecutor).
我只是想知道是否可以知道池中是否有免费执行程序.
这就是我想写的:
executor = futures.ProcessPoolExecutor(max_workers=5)
running = []
def consume(message):
print "actually consuming a single message"
def on_message(channel,method_frame,header_frame,message):
# this method is called once per incoming message
future = executor.submit(consume,message)
block_until_a_free_worker(executor,future)
def block_until_a_free_worker(executor,future):
running.append(future) # this grows forever!
futures.wait(running,timeout=5,return_when=futures.FIRST_COMPLETED)
[...]
channel.basic_consume(on_message,'my_queue')
channel.start_consuming()
我需要编写函数block_until_a_free_worker.
此方法应该能够检查是否所有正在运行的工作程序都在使用中.
在替代方案中,我可以使用任何阻塞executor.submit选项(如果可用).
我尝试了一种不同的方法,并在完成后改变期货清单.
我试图从列表中明确添加和删除期货,然后像这样等待:
futures.wait(running,return_when=futures.FIRST_COMPLETED)
这似乎不是解决方案.
我可以设置future.add_done_callback,并可能计算正在运行的实例…
任何提示或想法?
谢谢.
最佳答案
我给了类似的答案here.
原文链接:https://www.f2er.com/python/438677.html信号量用于限制对一组工作者的资源访问.
from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor
class TaskManager:
def __init__(self,workers):
self.pool = ProcessPoolExecutor(max_workers=workers)
self.workers = Semaphore(workers)
def new_task(self,function):
"""Start a new task,blocks if all workers are busy."""
self.workers.acquire() # flag a worker as busy
future = self.pool.submit(function,... )
future.add_task_done(self.task_done)
def task_done(self,future):
"""Called once task is done,releases one worker."""
self.workers.release()