我找不到
Ruby的一个体面的ThreadPool实现,所以我写了我的(部分基于代码从这里:
http://snippets.dzone.com/posts/show/3276,但更改为等待/信号和其他实现ThreadPool关闭,但经过一段时间的运行(有100个线程和处理大约1300个任务),它在25号线上死亡,它等待着一个新的工作,任何想法,为什么会发生?
require 'thread' begin require 'fastthread' rescue LoadError $stderr.puts "Using the ruby-core thread implementation" end class ThreadPool class Worker def initialize(callback) @mutex = Mutex.new @cv = ConditionVariable.new @callback = callback @mutex.synchronize {@running = true} @thread = Thread.new do while @mutex.synchronize {@running} block = get_block if block block.call reset_block # Signal the ThreadPool that this worker is ready for another job @callback.signal else # Wait for a new job @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25? end end end end def name @thread.inspect end def get_block @mutex.synchronize {@block} end def set_block(block) @mutex.synchronize do raise RuntimeError,"Thread already busy." if @block @block = block # Signal the thread in this class,that there's a job to be done @cv.signal end end def reset_block @mutex.synchronize {@block = nil} end def busy? @mutex.synchronize {!@block.nil?} end def stop @mutex.synchronize {@running = false} # Signal the thread not to wait for a new job @cv.signal @thread.join end end attr_accessor :max_size def initialize(max_size = 10) @max_size = max_size @workers = [] @mutex = Mutex.new @cv = ConditionVariable.new end def size @mutex.synchronize {@workers.size} end def busy? @mutex.synchronize {@workers.any? {|w| w.busy?}} end def shutdown @mutex.synchronize {@workers.each {|w| w.stop}} end alias :join :shutdown def process(block=nil,&blk) block = blk if block_given? while true @mutex.synchronize do worker = get_worker if worker return worker.set_block(block) else # Wait for a free worker @cv.wait(@mutex) end end end end # Used by workers to report ready status def signal @cv.signal end private def get_worker free_worker || create_worker end def free_worker @workers.each {|w| return w unless w.busy?}; nil end def create_worker return nil if @workers.size >= @max_size worker = Worker.new(self) @workers << worker worker end end
解决方法
好的,所以实现的主要问题是:如何确保没有信号丢失并避免死锁?
在我的经验中,这很难用条件变量和互斥体来实现,但是很容易用信号量.这样发生的是,ruby会实现一个应该解决问题的名为Queue(或SizedQueue)的对象.这是我建议的实施:
require 'thread' begin require 'fasttread' rescue LoadError $stderr.puts "Using the ruby-core thread implementation" end class ThreadPool class Worker def initialize(thread_queue) @mutex = Mutex.new @cv = ConditionVariable.new @queue = thread_queue @running = true @thread = Thread.new do @mutex.synchronize do while @running @cv.wait(@mutex) block = get_block if block @mutex.unlock block.call @mutex.lock reset_block end @queue << self end end end end def name @thread.inspect end def get_block @block end def set_block(block) @mutex.synchronize do raise RuntimeError,that there's a job to be done @cv.signal end end def reset_block @block = nil end def busy? @mutex.synchronize { !@block.nil? } end def stop @mutex.synchronize do @running = false @cv.signal end @thread.join end end attr_accessor :max_size def initialize(max_size = 10) @max_size = max_size @queue = Queue.new @workers = [] end def size @workers.size end def busy? @queue.size < @workers.size end def shutdown @workers.each { |w| w.stop } @workers = [] end alias :join :shutdown def process(block=nil,&blk) block = blk if block_given? worker = get_worker worker.set_block(block) end private def get_worker if !@queue.empty? or @workers.size == @max_size return @queue.pop else worker = Worker.new(@queue) @workers << worker worker end end end
这里是一个简单的测试代码:
tp = ThreadPool.new 500 (1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } } tp.shutdown