class Thread::Pool
A pool is a container of a limited amount of threads to which you can add tasks to run.
This is usually more performant and less memory intensive than creating a new thread for every task.
Attributes
If true, tasks will allow raised exceptions to pass through.
Similar to Thread.abort_on_exception
Public Class Methods
Create the pool with minimum and maximum threads.
The pool will start with the minimum amount of threads created and will spawn new threads until the max is reached in case of need.
A default block can be passed, which will be used to {#process} the passed data.
# File lib/thread/pool.rb, line 124 def initialize(min, max = nil, &block) @min = min @max = max || min @block = block @cond = ConditionVariable.new @mutex = Mutex.new @done = ConditionVariable.new @done_mutex = Mutex.new @todo = [] @workers = [] @timeouts = {} @spawned = 0 @waiting = 0 @shutdown = false @trim_requests = 0 @auto_trim = false @idle_trim = nil @mutex.synchronize { min.times.with_index { |index| begin spawn_thread rescue puts "can't create more than #{index} threads" break end } } end
Public Instance Methods
Enable auto trimming, unneeded threads will be deleted until the minimum is reached.
# File lib/thread/pool.rb, line 170 def auto_trim! @auto_trim = true self end
Check if auto trimming is enabled.
# File lib/thread/pool.rb, line 164 def auto_trim? @auto_trim end
Get the amount of tasks that still have to be run.
# File lib/thread/pool.rb, line 217 def backlog @mutex.synchronize { @todo.length } end
Are all tasks consumed?
# File lib/thread/pool.rb, line 224 def done? @mutex.synchronize { _done? } end
# File lib/thread/future.rb, line 161 def future(&block) Thread.future self, &block end
Check if there are idle workers.
# File lib/thread/pool.rb, line 256 def idle? @mutex.synchronize { _idle? } end
Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity. The minimum number of threads is respeced.
# File lib/thread/pool.rb, line 190 def idle_trim!(timeout) @idle_trim = timeout self end
Check if idle trimming is enabled.
# File lib/thread/pool.rb, line 184 def idle_trim? !@idle_trim.nil? end
Disable auto trimming.
# File lib/thread/pool.rb, line 177 def no_auto_trim! @auto_trim = false self end
Turn of idle trimming.
# File lib/thread/pool.rb, line 197 def no_idle_trim! @idle_trim = nil self end
Add a task to the pool which will execute the block with the given argument.
If no block is passed the default block will be used if present, an ArgumentError will be raised otherwise.
# File lib/thread/pool.rb, line 267 def process(*args, &block) unless block || @block raise ArgumentError, 'you must pass a block' end task = Task.new(self, *args, &(block || @block)) @mutex.synchronize { raise 'unable to add work while shutting down' if shutdown? @todo << task if @waiting == 0 && @spawned < @max spawn_thread end @cond.signal } task end
Resize the pool with the passed arguments.
# File lib/thread/pool.rb, line 209 def resize(min, max = nil) @min = min @max = max || min trim! end
Shut down the pool, it will block until all tasks have finished running.
# File lib/thread/pool.rb, line 322 def shutdown @mutex.synchronize { @shutdown = :nicely @cond.broadcast } until @workers.empty? if worker = @workers.first worker.join end end if @timeout @shutdown = :now wake_up_timeout @timeout.join end end
Shut down the pool instantly without finishing to execute tasks.
# File lib/thread/pool.rb, line 310 def shutdown! @mutex.synchronize { @shutdown = :now @cond.broadcast } wake_up_timeout self end
Check if the pool has been shut down.
# File lib/thread/pool.rb, line 159 def shutdown? !!@shutdown end
Shutdown the pool after a given amount of time.
# File lib/thread/pool.rb, line 344 def shutdown_after(timeout) Thread.new { sleep timeout shutdown } end
Returns the actual amount of workers
# File lib/thread/pool.rb, line 204 def size @workers.size end
Trim the unused threads, if forced threads will be trimmed even if there are tasks waiting.
# File lib/thread/pool.rb, line 293 def trim(force = false) @mutex.synchronize { if (force || @waiting > 0) && @spawned - @trim_requests > @min @trim_requests += 1 @cond.signal end } self end
Force #{trim}.
# File lib/thread/pool.rb, line 305 def trim! trim true end
Wait until all tasks are consumed. The caller will be blocked until then.
# File lib/thread/pool.rb, line 231 def wait(what = :idle) case what when :done until done? @done_mutex.synchronize { break if _done? @done.wait @done_mutex } end when :idle until idle? @done_mutex.synchronize { break if _idle? @done.wait @done_mutex } end end self end
Private Instance Methods
# File lib/thread/pool.rb, line 468 def _done? @todo.empty? and @waiting == @spawned end
# File lib/thread/pool.rb, line 472 def _idle? @todo.length < @waiting end
# File lib/thread/pool.rb, line 476 def done! @done_mutex.synchronize { @done.broadcast if _done? or _idle? } end
# File lib/thread/pool.rb, line 378 def spawn_thread @spawned += 1 thread = Thread.new { loop do task = @mutex.synchronize { if @todo.empty? while @todo.empty? if @trim_requests > 0 @trim_requests -= 1 break end break if shutdown? @waiting += 1 done! if @idle_trim and @spawned > @min check_time = Time.now + @idle_trim @cond.wait @mutex, @idle_trim @trim_requests += 1 if Time.now >= check_time && @spawned - @trim_requests > @min else @cond.wait @mutex end @waiting -= 1 end break if @todo.empty? && shutdown? end @todo.shift } or break task.execute break if @shutdown == :now trim if auto_trim? && @spawned > @min end @mutex.synchronize { @spawned -= 1 @workers.delete thread } } @workers << thread thread end
# File lib/thread/pool.rb, line 433 def spawn_timeout_thread @pipes = IO.pipe @timeout = Thread.new { loop do now = Time.now timeout = @timeouts.map {|task, time| next unless task.started_at now - task.started_at + task.timeout }.compact.min unless @timeouts.empty? readable, = IO.select([@pipes.first], nil, nil, timeout) break if @shutdown == :now if readable && !readable.empty? readable.first.read_nonblock 1024 end now = Time.now @timeouts.each {|task, time| next if !task.started_at || task.terminated? || task.finished? if now > task.started_at + task.timeout task.timeout! end } @timeouts.reject! { |task, _| task.terminated? || task.finished? } break if @shutdown == :now end } end
# File lib/thread/pool.rb, line 360 def timeout_for(task, timeout) unless @timeout spawn_timeout_thread end @mutex.synchronize { @timeouts[task] = timeout wake_up_timeout } end
# File lib/thread/pool.rb, line 372 def wake_up_timeout if defined? @pipes @pipes.last.write_nonblock 'x' rescue nil end end