class ActionPool::Pool
Public Class Methods
- :min_threads
-
minimum number of threads in pool
- :max_threads
-
maximum number of threads in pool
- :t_to
-
thread timeout waiting for action to process
- :a_to
-
maximum time action may be worked on before aborting
- :logger
-
logger to print logging messages to
Creates a new pool
# File lib/woolen_common/actionpool/pool.rb, line 18 def initialize(args={}) raise ArgumentError.new('Hash required for initialization') unless args.is_a?(Hash) @logger = args[:logger] && args[:logger].is_a?(Logger) ? args[:logger] : Logger.new(nil) @queue = ActionPool::Queue.new @threads = [] @lock = Splib::Monitor.new @thread_timeout = args[:t_to] ? args[:t_to] : 0 @action_timeout = args[:a_to] ? args[:a_to] : 0 @max_threads = args[:max_threads] ? args[:max_threads] : 100 @min_threads = args[:min_threads] ? args[:min_threads] : 10 @min_threads = @max_threads if @max_threads < @min_threads @respond_to = args[:respond_thread] || ::Thread.current @open = true fill_pool end
Public Instance Methods
- action
-
proc to be executed or array of [proc, [*args]]
Add a new proc/lambda to be executed (alias for queue)
# File lib/woolen_common/actionpool/pool.rb, line 112 def <<(action) case action when Proc queue(action) when Array raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless action.size == 2 and action[0].is_a?(Proc) and action[1].is_a?(Array) queue(action[0], action[1]) else raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') end nil end
Returns the next action to be processed
# File lib/woolen_common/actionpool/pool.rb, line 251 def action @queue.pop end
Number of actions in the queue
# File lib/woolen_common/actionpool/pool.rb, line 256 def action_size @queue.size end
Maximum number of seconds a thread is allowed to work on a given action (nil means thread is given unlimited time to work on action)
# File lib/woolen_common/actionpool/pool.rb, line 225 def action_timeout @action_timeout end
- t
-
timeout in seconds (nil for infinte)
Set maximum allowed time thread may work on a given action
# File lib/woolen_common/actionpool/pool.rb, line 242 def action_timeout=(t) t = t.to_f raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0 @action_timeout = t @threads.each{|thread|thread.action_timeout = t} t end
- jobs
-
Array
of proc/lambdas
Will queue a list of jobs into the pool
# File lib/woolen_common/actionpool/pool.rb, line 137 def add_jobs(jobs) raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed? raise ArgumentError.new("Expecting an array but received: #{jobs.class}") unless jobs.is_a?(Array) @queue.pause begin jobs.each do |job| case job when Proc @queue << [job, []] when Array raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless job.size == 2 and job[0].is_a?(Proc) and job[1].is_a?(Array) @queue << [job.shift, job] else raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') end end ensure num = jobs.size - @threads.select{|t|t.waiting?}.size num.times{ create_thread(:nowait) } if num > 0 @queue.unpause end true end
- args
-
:force forces a new thread. :nowait will create a thread if threads are waiting
Create a new thread for pool. Returns newly created thread or nil if pool is at maximum size
# File lib/woolen_common/actionpool/pool.rb, line 55 def create_thread(*args) return if pool_closed? thread = nil @lock.synchronize do if(((size == working || args.include?(:nowait)) && @threads.size < @max_threads) || args.include?(:force)) thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to, :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger, :autostart => false) @threads << thread end end thread.start if thread thread end
Fills the pool with the minimum number of threads Returns array of created threads
# File lib/woolen_common/actionpool/pool.rb, line 71 def fill_pool threads = [] if(@open) @lock.synchronize do required = min - size if(required > 0) required.times do thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to, :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger, :autostart => false) @threads << thread threads << thread end end end end threads.each{|t|t.start} threads end
Flush the thread pool. Mainly used for forcibly resizing the pool if existing threads have a long thread life waiting for input.
# File lib/woolen_common/actionpool/pool.rb, line 263 def flush mon = Splib::Monitor.new @threads.size.times{ queue{ mon.wait } } @queue.wait_empty sleep(0.01) mon.broadcast end
Maximum allowed number of threads
# File lib/woolen_common/actionpool/pool.rb, line 174 def max @max_threads end
- m
-
new max
Set maximum number of threads
# File lib/woolen_common/actionpool/pool.rb, line 185 def max=(m) m = m.to_i raise ArgumentError.new('Maximum value must be greater than 0') unless m > 0 @max_threads = m @min_threads = m if m < @min_threads resize if m < size m end
Minimum allowed number of threads
# File lib/woolen_common/actionpool/pool.rb, line 179 def min @min_threads end
- m
-
new min
Set minimum number of threads
# File lib/woolen_common/actionpool/pool.rb, line 196 def min=(m) m = m.to_i raise ArgumentError.new("Minimum value must be greater than 0 and less than or equal to maximum (#{max})") unless m > 0 && m <= max @min_threads = m m end
Pool
is closed
# File lib/woolen_common/actionpool/pool.rb, line 35 def pool_closed? !@open end
Pool
is open
# File lib/woolen_common/actionpool/pool.rb, line 40 def pool_open? @open end
- block
-
block to process
Adds a block to be processed
# File lib/woolen_common/actionpool/pool.rb, line 163 def process(*args, &block) queue(block, *args) nil end
- action
-
proc to be executed
Add a new proc/lambda to be executed
# File lib/woolen_common/actionpool/pool.rb, line 127 def queue(action, *args) raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed? raise ArgumentError.new('Expecting block') unless action.is_a?(Proc) @queue << [action, args] ::Thread.pass create_thread end
- t
-
ActionPool::Thread
to remove
Removes a thread from the pool
# File lib/woolen_common/actionpool/pool.rb, line 205 def remove(t) raise ArgumentError.new('Expecting an ActionPool::Thread object') unless t.is_a?(ActionPool::Thread) t.stop del = @threads.include?(t) @threads.delete(t) if del fill_pool del end
- force
-
force immediate stop
Stop the pool
# File lib/woolen_common/actionpool/pool.rb, line 93 def shutdown(force=false) status(:closed) args = [] args.push(:force) if force @logger.info("Pool is now shutting down #{force ? 'using force' : ''}") @queue.clear if force @queue.wait_empty while(t = @threads.pop) do t.stop(*args) end unless(force) flush @threads.each{|t|t.join} end nil end
Current size of pool
# File lib/woolen_common/actionpool/pool.rb, line 169 def size @threads.size end
- arg
-
:open or :closed
Set pool status
# File lib/woolen_common/actionpool/pool.rb, line 46 def status(arg) @open = arg == :open fill_pool if @open end
# File lib/woolen_common/actionpool/pool.rb, line 276 def thread_stats @threads.map{|t|[t.object_id,t.status]} end
Maximum number of seconds a thread is allowed to idle in the pool. (nil means thread life is infinite)
# File lib/woolen_common/actionpool/pool.rb, line 217 def thread_timeout @thread_timeout end
- t
-
timeout in seconds (nil for infinite)
Set maximum allowed time thead may idle in pool
# File lib/woolen_common/actionpool/pool.rb, line 231 def thread_timeout=(t) t = t.to_f raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0 @thread_timeout = t @threads.each{|thread|thread.thread_timeout = t} t end
Returns current number of threads in the pool working
# File lib/woolen_common/actionpool/pool.rb, line 272 def working @threads.select{|t|t.running?}.size end
Private Instance Methods
Resize the pool
# File lib/woolen_common/actionpool/pool.rb, line 283 def resize @logger.info("Pool is being resized to stated maximum: #{max}") until(size <= max) do t = nil t = @threads.find{|t|t.waiting?} t = @threads.shift unless t t.stop end flush nil end