class ActionPool::Thread

Public Class Methods

new(args) click to toggle source
:pool

pool thread is associated with

:t_timeout

max time a thread is allowed to wait for action

:a_timeout

max time thread is allowed to work

:respond_thread

thread to send execptions to

:logger

LogHelper for logging messages

:autostart

Automatically start the thread

Create a new thread

# File lib/woolen_common/actionpool/thread.rb, line 18
def initialize(args)
    raise ArgumentError.new('Hash required for initialization') unless args.is_a?(Hash)
    raise ArgumentError.new('ActionPool::Thread requires a pool') unless args[:pool]
    raise ArgumentError.new('ActionPool::Thread requries thread to respond') unless args[:respond_thread]
    @pool = args[:pool]
    @respond_to = args[:respond_thread]
    @thread_timeout = args[:t_timeout] ? args[:t_timeout].to_f : 0
    @action_timeout = args[:a_timeout] ? args[:a_timeout].to_f : 0
    args[:autostart] = true unless args.has_key?(:autostart)
    @kill = false
    @logger = args[:logger].is_a?(Logger) ? args[:logger] : Logger.new(nil)
    @lock = Splib::Monitor.new
    @action = nil
    @thread = args[:autostart] ? ::Thread.new{ start_thread } : nil
end

Public Instance Methods

action_timeout() click to toggle source

Seconds thread will spend working on a given task

# File lib/woolen_common/actionpool/thread.rb, line 97
def action_timeout
    @action_timeout
end
action_timeout=(t) click to toggle source
t

seconds to work on a task (floats allow for values 0 < t < 1)

Set the maximum amount of time to work on a given task Note: Modification of this will not affect actions already in process

# File lib/woolen_common/actionpool/thread.rb, line 114
def action_timeout=(t)
    t = t.to_f
    raise ArgumentError.new('Value must be great than zero or nil') unless t > 0
    @action_timeout = t
    t
end
alive?() click to toggle source

Is the thread still alive

# File lib/woolen_common/actionpool/thread.rb, line 68
def alive?
    @thread.alive?
end
join() click to toggle source

Join internal thread

# File lib/woolen_common/actionpool/thread.rb, line 78
def join
    @thread.join(@action_timeout)
    if(@thread.alive?)
        @thread.kill
        @thread.join
    end
end
kill() click to toggle source

Kill internal thread

# File lib/woolen_common/actionpool/thread.rb, line 87
def kill
    @thread.kill
end
running?() click to toggle source

Currently running

# File lib/woolen_common/actionpool/thread.rb, line 62
        def running?
            !@action.nil?
#             @status == :run
        end
start() click to toggle source
# File lib/woolen_common/actionpool/thread.rb, line 34
def start
    @thread = ::Thread.new{ start_thread } if @thread.nil?
end
status() click to toggle source

Current thread status

# File lib/woolen_common/actionpool/thread.rb, line 73
def status
    @action
end
stop(*args) click to toggle source
:force

force the thread to stop

:wait

wait for the thread to stop

Stop the thread

# File lib/woolen_common/actionpool/thread.rb, line 41
def stop(*args)
    @kill = true
    if(args.include?(:force) || waiting?)
        begin
            @thread.raise Wakeup.new
        rescue Wakeup
            #ignore since we are the caller
        end
        sleep(0.01)
        @thread.kill if @thread.alive?
    end
    nil
end
thread_timeout() click to toggle source

Seconds thread will wait for input

# File lib/woolen_common/actionpool/thread.rb, line 92
def thread_timeout
    @thread_timeout
end
thread_timeout=(t) click to toggle source
t

seconds to wait for input (floats allow for values 0 < t < 1)

Set the maximum amount of time to wait for a task

# File lib/woolen_common/actionpool/thread.rb, line 103
def thread_timeout=(t)
    t = t.to_f
    raise ArgumentError.new('Value must be great than zero or nil') unless t > 0
    @thread_timeout = t
    @thread.raise Retimeout.new if waiting?
    t
end
waiting?() click to toggle source

Currently waiting

# File lib/woolen_common/actionpool/thread.rb, line 56
        def waiting?
            @action.nil?
#             @status == :wait
        end

Private Instance Methods

run(action, args) click to toggle source
action

task to be run

args

arguments to be passed to task

Run the task

# File lib/woolen_common/actionpool/thread.rb, line 166
def run(action, args)
    args = args.respond_to?(:fixed_flatten) ? args.fixed_flatten(1) : args.flatten(1)
    begin
        unless(@action_timeout.zero?)
            Timeout::timeout(@action_timeout) do
                action.call(*args)
            end
        else
            action.call(*args)
        end
    rescue Timeout::Error => boom
        @logger.warn("Pool thread reached max execution time for action: #{boom}")
    end
end
start_thread() click to toggle source

Start our thread

# File lib/woolen_common/actionpool/thread.rb, line 124
def start_thread
    begin
        @logger.info("New pool thread is starting (#{self})")
        until(@kill) do
            begin
                @action = nil
                if(@pool.size > @pool.min && !@thread_timeout.zero?)
                    Timeout::timeout(@thread_timeout) do
                        @action = @pool.action
                    end
                else
                    @action = @pool.action
                end
                run(@action[0], @action[1]) unless @action.nil?
            rescue Timeout::Error
                @kill = true
            rescue Wakeup
                @logger.info("Thread #{::Thread.current} was woken up.")
            rescue Retimeout
                @logger.warn('Thread was woken up to reset thread timeout')
            rescue Exception => boom
                @logger.error("Pool thread caught an exception: #{boom}\n#{boom.backtrace.join("\n")}")
                @respond_to.raise boom
            end
        end
    rescue Retimeout
        @logger.warn('Thread was woken up to reset thread timeout')
        retry
    rescue Wakeup
        @logger.info("Thread #{::Thread.current} was woken up.")
    rescue Exception => boom
        @logger.error("Pool thread caught an exception: #{boom}\n#{boom.backtrace.join("\n")}")
        @respond_to.raise boom
    ensure
        @logger.info("Pool thread is shutting down (#{self})")
        @pool.remove(self)
    end
end