我有一个函数接受大量的x,y对作为输入,使用numpy和scipy做一些精细的曲线拟合,然后返回一个值.为了尝试加快速度,我尝试使用Queue.Queue将数据提供给两个线程.数据完成后.我试图让线程终止,然后结束调用进程并将控制权返回给
shell.
我试图理解为什么我必须在threading中使用私有方法.线程停止我的线程并将控制返回到命令行.
self.join()不会结束程序.获得控制权的唯一方法是使用私有停止方法.
def stop(self): print "STOP CALLED" self.finished.set() print "SET DONE" # self.join(timeout=None) does not work self._Thread__stop()
这是我的代码的近似值:
class CalcThread(threading.Thread): def __init__(self,in_queue,out_queue,function): threading.Thread.__init__(self) self.in_queue = in_queue self.out_queue = out_queue self.function = function self.finished = threading.Event() def stop(self): print "STOP CALLED" self.finished.set() print "SET DONE" self._Thread__stop() def run(self): while not self.finished.isSet(): params_for_function = self.in_queue.get() try: tm = self.function(paramsforfunction) self.in_queue.task_done() self.out_queue.put(tm) except ValueError as v: #modify params and reinsert into queue window = params_for_function["window"] params_for_function["window"] = window + 1 self.in_queue.put(params_for_function) def big_calculation(well_id,window,data_arrays): # do some analysis to calculate tm return tm if __name__ == "__main__": NUM_THREADS = 2 workers = [] in_queue = Queue() out_queue = Queue() for i in range(NUM_THREADS): w = CalcThread(in_queue,big_calculation) w.start() workers.append(w) if options.analyze_all: for i in well_ids: in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict)) in_queue.join() print "ALL THREADS SEEM TO BE DONE" # gather data and report it from out_queue for i in well_ids: p = out_queue.get() print p out_queue.task_done() # I had to do this to get the out_queue to proceed if out_queue.qsize() == 0: out_queue.join() break # Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method for aworker in workers: aworker.stop()
解决方法
通常,杀死修改共享资源的线程是个坏主意.
除非在执行计算时释放GIL,否则多线程中的cpu密集型任务比Python中的无用任务更糟糕.许多numpy函数确实发布了GIL.
ThreadPoolExecutor example from the docs
import concurrent.futures # on Python 2.x: pip install futures calc_args = [] if options.analyze_all: calc_args.extend(dict(well_id=i,...) for i in well_ids) with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: future_to_args = dict((executor.submit(big_calculation,args),args) for args in calc_args) while future_to_args: for future in concurrent.futures.as_completed(dict(**future_to_args)): args = future_to_args.pop(future) if future.exception() is not None: print('%r generated an exception: %s' % (args,future.exception())) if isinstance(future.exception(),ValueError): #modify params and resubmit args["window"] += 1 future_to_args[executor.submit(big_calculation,args)] = args else: print('f%r returned %r' % (args,future.result())) print("ALL work SEEMs TO BE DONE")
如果没有共享状态,您可以用ProcessPoolExecutor替换ThreadPoolExecutor.将代码放在main()函数中.