class Utilrb::ThreadPool::Task
A Task
is executed by the thread pool as soon as a free thread is available.
@author Alexander Duda <Alexander.Duda@dfki.de>
Constants
- Asked
Attributes
Custom description which can be used to store a human readable object
The exception thrown by the custom code block
@return [Exception] the exception
Thread pool the task belongs to
@return [ThreadPool] the thread pool
The time the task was queued
return [Time] the time
Result of the code block call
The time the task was started
return [Time] the time
State of the task
@return [:waiting,:running,:stopping,:finished,:terminated,:exception] the state
The time the task was stopped or finished
return [Time] the time
The sync key is used to speifiy that a given task must not run in paralles with another task having the same sync key. If no key is set there are no such constrains for the taks.
@return the sync key
The thread the task was assigned to
return [Thread] the thread
Public Class Methods
A new task which can be added to the work queue of a {ThreadPool}. If a sync key is given no task having the same key will be executed in parallel which is useful for instance member calls which are not thread safe.
@param [Hash] options The options of the task. @option options [Object] :sync_key The sync key @option options [Proc] :callback The callback @option options [Object] :default Default value returned when an error ocurred which was handled. @param [Array] args The arguments for the code block @param [#call] block The code block
# File lib/utilrb/thread_pool.rb, line 128 def initialize (options = Hash.new,*args, &block) unless block raise ArgumentError, 'you must pass a work block to initialize a new Task.' end options = Kernel.validate_options(options,{:sync_key => nil,:default => nil,:callback => nil}) @sync_key = options[:sync_key] @arguments = args @default = options[:default] @callback = options[:callback] @block = block @mutex = Mutex.new @pool = nil @state_temp = nil @state = nil reset end
Public Instance Methods
Called from the worker thread when the work is done
@yield [Object,Exception] The callback
# File lib/utilrb/thread_pool.rb, line 231 def callback(&block) @mutex.synchronize do @callback = block end end
returns true if the task has a default return vale @return [Boolean]
# File lib/utilrb/thread_pool.rb, line 164 def default? @mutex.synchronize do @default != nil end end
Checks if an exception occurred.
@return [Boolean]
# File lib/utilrb/thread_pool.rb, line 115 def exception?; @state == :exception; end
Executes the task. Should be called from a worker thread after pre_execute
was called. After execute returned and the task was deleted from any internal list finalize must be called to propagate the task state.
# File lib/utilrb/thread_pool.rb, line 188 def execute() raise RuntimeError, "call pre_execute ThreadPool::Task first. Current state is #{@state} but :running was expected" if @state != :running @state_temp = begin @result = @block.call(*@arguments) :finished rescue Exception => e @exception = e if e.is_a? Asked :terminated else :exception end end @stopped_at = Time.now end
propagates the tasks state should be called after execute
# File lib/utilrb/thread_pool.rb, line 206 def finalize @mutex.synchronize do @thread = nil @state = @state_temp @pool = nil end begin @callback.call @result,@exception if @callback rescue Exception => e ThreadPool.report_exception("thread_pool: in #{self}, callback #{@callback} failed", e) end end
Checks if the task was stopped or finished. This also includes cases where an exception was raised by the custom code block.
@return [Boolean]
# File lib/utilrb/thread_pool.rb, line 99 def finished?; started? && !running? && !stopping?; end
sets all internal state to running call execute after that.
# File lib/utilrb/thread_pool.rb, line 172 def pre_execute(pool=nil) @mutex.synchronize do #store current thread to be able to terminate #the thread @pool = pool @thread = Thread.current @started_at = Time.now @state = :running end end
Resets the tasks. This can be used to requeue a task that is already finished
# File lib/utilrb/thread_pool.rb, line 147 def reset if finished? || !started? @mutex.synchronize do @result = @default @state = :waiting @exception = nil @started_at = nil @queued_at = nil @stopped_at = nil end else raise RuntimeError,"cannot reset a task which is not finished" end end
Checks if the task is running
@return [Boolean]
# File lib/utilrb/thread_pool.rb, line 87 def running?; @state == :running; end
Checks if the task was started
@return [Boolean]
# File lib/utilrb/thread_pool.rb, line 82 def started?; @state != :waiting; end
Checks if the task is going to be stopped
@return [Boolean]
# File lib/utilrb/thread_pool.rb, line 92 def stopping?; @state == :stopping; end
Checks if the task was successfully finished. This means no exceptions, termination or timed out occurred
@return [Boolean]
# File lib/utilrb/thread_pool.rb, line 105 def successfull?; @state == :finished; end
Terminates the task if it is running
# File lib/utilrb/thread_pool.rb, line 220 def terminate!(exception = Asked) @mutex.synchronize do return unless running? @state = :stopping @thread.raise exception end end
Checks if the task was terminated.
@return [Boolean]
# File lib/utilrb/thread_pool.rb, line 110 def terminated?; @state == :terminated; end
Returns the number of seconds the task is or was running at the given point in time
@param [Time] time The point in time. @return
# File lib/utilrb/thread_pool.rb, line 242 def time_elapsed(time = Time.now) #no need to synchronize here if @stopped_at (@stopped_at-@started_at).to_f elsif @started_at (time-@started_at).to_f else 0 end end