class Utilrb::ThreadPool
ThreadPool
implementation inspired by github.com/meh/ruby-threadpool
@example Using a thread pool of 10 threads
pool = ThreadPool.new(10) 0.upto(9) do pool.process do sleep 1 puts "done" end end pool.shutdown pool.join
@author Alexander Duda <Alexander.Duda@dfki.de>
Attributes
Auto trim automatically reduces the number of worker threads if there are too many threads waiting for work. @return [Boolean]
The average execution time of a (running) task.
@return [Float]
The average waiting time of a task before being executed.
@return [Float]
The maximum number of worker threads.
@return [Fixnum]
The minimum number of worker threads.
@return [Fixnum]
The real number of worker threads.
@return [Fixnum]
The number of worker threads waiting for work.
@return [Fixnum]
Public Class Methods
@param [Fixnum] min the minimum number of threads @param [Fixnum] max the maximum number of threads
# File lib/utilrb/thread_pool.rb, line 293 def initialize (min = 5, max = min) @min = min @max = max @cond = ConditionVariable.new @cond_sync_key = ConditionVariable.new @mutex = Mutex.new @tasks_waiting = [] # tasks waiting for execution @tasks_running = [] # tasks which are currently running # Statistics @avg_run_time = 0 # average run time of a task in s [Float] @avg_wait_time = 0 # average time a task has to wait for execution in s [Float] @workers = [] # thread pool @spawned = 0 @waiting = 0 @shutdown = false @callback_on_task_finished = nil @pipes = nil @sync_keys = Set.new @trim_requests = 0 @auto_trim = false @mutex.synchronize do min.times do spawn_thread end end end
Private Class Methods
# File lib/utilrb/thread_pool.rb, line 619 def self.report_exception(msg, e) if msg STDERR.puts msg end STDERR.puts e.message STDERR.puts " #{e.backtrace.join("\n ")}" end
Public Instance Methods
Processes the given {Task} as soon as the next thread is available
@param [Task] task The task. @return [Task]
# File lib/utilrb/thread_pool.rb, line 486 def <<(task) raise "cannot add task #{task} it is still running" if task.thread task.reset if task.finished? @mutex.synchronize do if shutdown? raise "unable to add work while shutting down" end task.queued_at = Time.now @tasks_waiting << task if @waiting <= @tasks_waiting.size && @spawned < @max spawn_thread end @cond.signal end task end
Number of tasks waiting for execution
@return [Fixnum] the number of tasks
# File lib/utilrb/thread_pool.rb, line 375 def backlog @mutex.synchronize do @tasks_waiting.length end end
# File lib/utilrb/thread_pool.rb, line 343 def clear shutdown join rescue Exception ensure @shutdown = false end
Blocks until all threads were terminated. This does not terminate any thread by itself and will block for ever if shutdown was not called.
# File lib/utilrb/thread_pool.rb, line 529 def join while true if w = @mutex.synchronize { @workers.first } w.join else break end end self end
sets the maximum number of threads
# File lib/utilrb/thread_pool.rb, line 332 def max=(val) resize(min,val) end
sets the minimum number of threads
# File lib/utilrb/thread_pool.rb, line 327 def min=(val) resize(val,max) end
Given code block is called for every task which was finished even it was terminated.
This can be used to store the result for an event loop
@yield [Task] the code block
# File lib/utilrb/thread_pool.rb, line 546 def on_task_finished (&block) @mutex.synchronize do @callback_on_task_finished = block end end
Processes the given block as soon as the next thread is available.
@param [Array] args the block arguments @yield [*args] the block @return [Task]
# File lib/utilrb/thread_pool.rb, line 395 def process (*args, &block) process_with_options(nil,*args,&block) end
Returns true if a worker thread is currently processing a task and no work is queued
@return [Boolean]
# File lib/utilrb/thread_pool.rb, line 403 def process? @mutex.synchronize do waiting != spawned || @tasks_waiting.length > 0 end end
Processes the given block as soon as the next thread is available with the given options.
@param (see Task#initialize) @option (see Task#initialize) @return [Task]
# File lib/utilrb/thread_pool.rb, line 415 def process_with_options(options,*args, &block) task = Task.new(options,*args, &block) self << task task end
Changes the minimum and maximum number of threads
@param [Fixnum] min the minimum number of threads @param [Fixnum] max the maximum number of threads
# File lib/utilrb/thread_pool.rb, line 360 def resize (min, max = nil) @mutex.synchronize do @min = min @max = max || min count = [@tasks_waiting.size,@max - @spawned].min count.times do spawn_thread end end trim true end
Shuts down all threads.
# File lib/utilrb/thread_pool.rb, line 517 def shutdown tasks = nil @mutex.synchronize do @shutdown = true @cond.broadcast end end
Checks if the thread pool is shutting down all threads.
@return [boolean]
# File lib/utilrb/thread_pool.rb, line 354 def shutdown?; @shutdown; end
Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.
This is useful for instance member calls which are not thread safe.
@param [Object] sync_key The sync key @yield [*args] the code block block @return [Object] The result of the code block
# File lib/utilrb/thread_pool.rb, line 431 def sync(sync_key,*args,&block) raise ArgumentError,"no sync key" unless sync_key @mutex.synchronize do while(!@sync_keys.add?(sync_key)) @cond_sync_key.wait @mutex #wait until someone has removed a key end end begin result = block.call(*args) ensure @mutex.synchronize do @sync_keys.delete sync_key end @cond_sync_key.signal @cond.signal # worker threads are just waiting for work no matter if it is # because of a deletion of a sync_key or a task was added end result end
returns the current used sync_keys
# File lib/utilrb/thread_pool.rb, line 337 def sync_keys @mutex.synchronize do @sync_keys.clone end end
Same as sync but raises Timeout::Error if sync_key cannot be obtained after the given execution time.
@param [Object] sync_key The sync key @param [Float] timeout The timeout @yield [*args] the code block block @return [Object] The result of the code block
# File lib/utilrb/thread_pool.rb, line 459 def sync_timeout(sync_key,timeout,*args,&block) raise ArgumentError,"no sync key" unless sync_key Timeout::timeout(timeout) do @mutex.synchronize do while(!@sync_keys.add?(sync_key)) @cond_sync_key.wait @mutex #wait until someone has removed a key end end end begin result = block.call(*args) ensure @mutex.synchronize do @sync_keys.delete sync_key end @cond_sync_key.signal @cond.signal # worker threads are just waiting for work no matter if it is # because of a deletion of a sync_key or a task was added end result end
Returns an array of the current waiting and running tasks
@return [Array<Task>] The tasks
# File lib/utilrb/thread_pool.rb, line 384 def tasks @mutex.synchronize do @tasks_running.dup + @tasks_waiting.dup end end
Trims the number of threads if threads are waiting for work and the number of spawned threads is higher than the minimum number.
@param [boolean] force Trim even if no thread is waiting.
# File lib/utilrb/thread_pool.rb, line 507 def trim (force = false) @mutex.synchronize do @trim_requests += 1 @cond.signal end self end
Private Instance Methods
calculates the moving average
# File lib/utilrb/thread_pool.rb, line 555 def moving_average(current_val,new_val) return new_val if current_val == 0 current_val * 0.95 + new_val * 0.05 end
spawns a worker thread must be called from a synchronized block
# File lib/utilrb/thread_pool.rb, line 562 def spawn_thread thread = Thread.new do while !shutdown? do current_task = @mutex.synchronize do while !shutdown? task = @tasks_waiting.each_with_index do |t,i| if !t.sync_key || @sync_keys.add?(t.sync_key) @tasks_waiting.delete_at(i) t.pre_execute(self) # block tasks so that no one is using it at the same time @tasks_running << t @avg_wait_time = moving_average(@avg_wait_time,(Time.now-t.queued_at)) break t end end break task unless task.is_a? Array if @spawned > @min && (auto_trim || @trim_requests > 0) if @trim_requests > 0 @trim_requests -= 1 end break end @waiting += 1 @cond.wait @mutex @waiting -= 1 end or break end or break begin current_task.execute rescue Exception => e ThreadPool.report_exception(nil, e) ensure @mutex.synchronize do @tasks_running.delete current_task if current_task.sync_key @sync_keys.delete(current_task.sync_key) @cond_sync_key.signal @cond.signal # maybe another thread is waiting for a sync key end @avg_run_time = moving_average(@avg_run_time,(current_task.stopped_at-current_task.started_at)) end current_task.finalize # propagate state after it was deleted from the internal lists @callback_on_task_finished.call(current_task) if @callback_on_task_finished end end @mutex.synchronize do @spawned -= 1 @workers.delete thread end end @spawned += 1 @workers << thread rescue Exception => e ThreadPool.report_exception(nil, e) end