class ActionPool::Pool

Public Class Methods

new(args={}) click to toggle source
:min_threads

minimum number of threads in pool

:max_threads

maximum number of threads in pool

:t_to

thread timeout waiting for action to process

:a_to

maximum time action may be worked on before aborting

:logger

logger to print logging messages to

Creates a new pool

# File lib/woolen_common/actionpool/pool.rb, line 18
def initialize(args={})
    raise ArgumentError.new('Hash required for initialization') unless args.is_a?(Hash)
    @logger = args[:logger] && args[:logger].is_a?(Logger) ? args[:logger] : Logger.new(nil)
    @queue = ActionPool::Queue.new
    @threads = []
    @lock = Splib::Monitor.new
    @thread_timeout = args[:t_to] ? args[:t_to] : 0
    @action_timeout = args[:a_to] ? args[:a_to] : 0
    @max_threads = args[:max_threads] ? args[:max_threads] : 100
    @min_threads = args[:min_threads] ? args[:min_threads] : 10
    @min_threads = @max_threads if @max_threads < @min_threads
    @respond_to = args[:respond_thread] || ::Thread.current
    @open = true
    fill_pool
end

Public Instance Methods

<<(action) click to toggle source
action

proc to be executed or array of [proc, [*args]]

Add a new proc/lambda to be executed (alias for queue)

# File lib/woolen_common/actionpool/pool.rb, line 112
def <<(action)
    case action
        when Proc
            queue(action)
        when Array
            raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless action.size == 2 and action[0].is_a?(Proc) and action[1].is_a?(Array)
            queue(action[0], action[1])
        else
            raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]')
    end
    nil
end
action() click to toggle source

Returns the next action to be processed

# File lib/woolen_common/actionpool/pool.rb, line 251
def action
    @queue.pop
end
action_size() click to toggle source

Number of actions in the queue

# File lib/woolen_common/actionpool/pool.rb, line 256
def action_size
    @queue.size
end
action_timeout() click to toggle source

Maximum number of seconds a thread is allowed to work on a given action (nil means thread is given unlimited time to work on action)

# File lib/woolen_common/actionpool/pool.rb, line 225
def action_timeout
    @action_timeout
end
action_timeout=(t) click to toggle source
t

timeout in seconds (nil for infinte)

Set maximum allowed time thread may work on a given action

# File lib/woolen_common/actionpool/pool.rb, line 242
def action_timeout=(t)
    t = t.to_f
    raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0
    @action_timeout = t
    @threads.each{|thread|thread.action_timeout = t}
    t
end
add_jobs(jobs) click to toggle source
jobs

Array of proc/lambdas

Will queue a list of jobs into the pool

# File lib/woolen_common/actionpool/pool.rb, line 137
def add_jobs(jobs)
    raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed?
    raise ArgumentError.new("Expecting an array but received: #{jobs.class}") unless jobs.is_a?(Array)
    @queue.pause
    begin
        jobs.each do |job|
            case job
            when Proc
                @queue << [job, []]
            when Array
                raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless job.size == 2 and job[0].is_a?(Proc) and job[1].is_a?(Array)
                @queue << [job.shift, job]
            else
                raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]')
            end
        end
    ensure
        num = jobs.size - @threads.select{|t|t.waiting?}.size
        num.times{ create_thread(:nowait) } if num > 0
        @queue.unpause
    end
    true
end
create_thread(*args) click to toggle source
args

:force forces a new thread. :nowait will create a thread if threads are waiting

Create a new thread for pool. Returns newly created thread or nil if pool is at maximum size

# File lib/woolen_common/actionpool/pool.rb, line 55
def create_thread(*args)
    return if pool_closed?
    thread = nil
    @lock.synchronize do
        if(((size == working || args.include?(:nowait)) && @threads.size < @max_threads) || args.include?(:force))
            thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to, :a_timeout => @action_timeout,
                :t_timeout => @thread_timeout, :logger => @logger, :autostart => false)
            @threads << thread
        end
    end
    thread.start if thread
    thread
end
fill_pool() click to toggle source

Fills the pool with the minimum number of threads Returns array of created threads

# File lib/woolen_common/actionpool/pool.rb, line 71
def fill_pool
    threads = []
    if(@open)
        @lock.synchronize do
            required = min - size
            if(required > 0)
                required.times do
                    thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to,
                        :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger,
                        :autostart => false)
                    @threads << thread
                    threads << thread
                end
            end
        end
    end
    threads.each{|t|t.start}
    threads
end
flush() click to toggle source

Flush the thread pool. Mainly used for forcibly resizing the pool if existing threads have a long thread life waiting for input.

# File lib/woolen_common/actionpool/pool.rb, line 263
def flush
    mon = Splib::Monitor.new
    @threads.size.times{ queue{ mon.wait } }
    @queue.wait_empty
    sleep(0.01)
    mon.broadcast
end
max() click to toggle source

Maximum allowed number of threads

# File lib/woolen_common/actionpool/pool.rb, line 174
def max
    @max_threads
end
max=(m) click to toggle source
m

new max

Set maximum number of threads

# File lib/woolen_common/actionpool/pool.rb, line 185
def max=(m)
    m = m.to_i
    raise ArgumentError.new('Maximum value must be greater than 0') unless m > 0
    @max_threads = m
    @min_threads = m if m < @min_threads
    resize if m < size
    m
end
min() click to toggle source

Minimum allowed number of threads

# File lib/woolen_common/actionpool/pool.rb, line 179
def min
    @min_threads
end
min=(m) click to toggle source
m

new min

Set minimum number of threads

# File lib/woolen_common/actionpool/pool.rb, line 196
def min=(m)
    m = m.to_i
    raise ArgumentError.new("Minimum value must be greater than 0 and less than or equal to maximum (#{max})") unless m > 0 && m <= max
    @min_threads = m
    m
end
pool_closed?() click to toggle source

Pool is closed

# File lib/woolen_common/actionpool/pool.rb, line 35
def pool_closed?
    !@open
end
pool_open?() click to toggle source

Pool is open

# File lib/woolen_common/actionpool/pool.rb, line 40
def pool_open?
    @open
end
process(*args, &block) click to toggle source
block

block to process

Adds a block to be processed

# File lib/woolen_common/actionpool/pool.rb, line 163
def process(*args, &block)
    queue(block, *args)
    nil
end
queue(action, *args) click to toggle source
action

proc to be executed

Add a new proc/lambda to be executed

# File lib/woolen_common/actionpool/pool.rb, line 127
def queue(action, *args)
    raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed?
    raise ArgumentError.new('Expecting block') unless action.is_a?(Proc)
    @queue << [action, args]
    ::Thread.pass
    create_thread
end
remove(t) click to toggle source
t

ActionPool::Thread to remove

Removes a thread from the pool

# File lib/woolen_common/actionpool/pool.rb, line 205
def remove(t)
    raise ArgumentError.new('Expecting an ActionPool::Thread object') unless t.is_a?(ActionPool::Thread)
    t.stop
    del = @threads.include?(t)
    @threads.delete(t) if del
    fill_pool
    del
end
shutdown(force=false) click to toggle source
force

force immediate stop

Stop the pool

# File lib/woolen_common/actionpool/pool.rb, line 93
def shutdown(force=false)
    status(:closed)
    args = []
    args.push(:force) if force
    @logger.info("Pool is now shutting down #{force ? 'using force' : ''}")
    @queue.clear if force
    @queue.wait_empty
    while(t = @threads.pop) do
        t.stop(*args)
    end
    unless(force)
        flush
        @threads.each{|t|t.join}
    end
    nil
end
size() click to toggle source

Current size of pool

# File lib/woolen_common/actionpool/pool.rb, line 169
def size
    @threads.size
end
status(arg) click to toggle source
arg

:open or :closed

Set pool status

# File lib/woolen_common/actionpool/pool.rb, line 46
def status(arg)
    @open = arg == :open
    fill_pool if @open
end
thread_stats() click to toggle source
# File lib/woolen_common/actionpool/pool.rb, line 276
def thread_stats
    @threads.map{|t|[t.object_id,t.status]}
end
thread_timeout() click to toggle source

Maximum number of seconds a thread is allowed to idle in the pool. (nil means thread life is infinite)

# File lib/woolen_common/actionpool/pool.rb, line 217
def thread_timeout
    @thread_timeout
end
thread_timeout=(t) click to toggle source
t

timeout in seconds (nil for infinite)

Set maximum allowed time thead may idle in pool

# File lib/woolen_common/actionpool/pool.rb, line 231
def thread_timeout=(t)
    t = t.to_f
    raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0
    @thread_timeout = t
    @threads.each{|thread|thread.thread_timeout = t}
    t
end
working() click to toggle source

Returns current number of threads in the pool working

# File lib/woolen_common/actionpool/pool.rb, line 272
def working
    @threads.select{|t|t.running?}.size
end

Private Instance Methods

resize() click to toggle source

Resize the pool

# File lib/woolen_common/actionpool/pool.rb, line 283
def resize
    @logger.info("Pool is being resized to stated maximum: #{max}")
    until(size <= max) do
        t = nil
        t = @threads.find{|t|t.waiting?}
        t = @threads.shift unless t
        t.stop
    end
    flush
    nil
end