class Utilrb::EventLoop

Simple event loop which supports timers and defers blocking operations to a thread pool those results are queued and being processed by the event loop thread at the end of each step.

All events must be code blocks which will be executed at the end of each step. There is no support for filtering or event propagations.

For an easy integration of ruby classes into the event loop the {Forwardable#def_event_loop_delegator} can be used.

@example Example for using the EventLoop

event_loop = EventLoop.new 
event_loop.once do 
  puts "called once"
end

event_loop.every(1.0) do 
  puts "called every second"
end

callback = Proc.new |result|
  puts result 
end
event_loop.defer callback do
  sleep 2
  "result from the worker thread #{Thread.current}"
end

event_loop.exec

@author Alexander Duda <Alexander.Duda@dfki.de>

Attributes

thread_pool[R]

Underlying thread pool used to defer work.

@return [Utilrb::ThreadPool]

Public Class Methods

cleanup_backtrace(&block) click to toggle source
# File lib/utilrb/event_loop.rb, line 172
def self.cleanup_backtrace(&block)
    block.call
rescue
    $@.delete_if{|s| %r"#{Regexp.quote(__FILE__)}"o =~ s}
    ::Kernel::raise
end
new() click to toggle source

A new EventLoop

# File lib/utilrb/event_loop.rb, line 185
def initialize
    @mutex = Mutex.new
    @events = Queue.new               # stores all events for the next step
    @timers = Set.new                 # stores all timers
    @every_cylce_events = Set.new     # stores all events which are added to @events each step
    @on_error = {}                    # stores on error callbacks
    @errors = Queue.new               # stores errors which will be re raised at the end of the step
    @number_of_events_to_process = 0  # number of events which are processed in the current step
    @thread_pool = ThreadPool.new
    @thread = Thread.current #the event loop thread
    @stop = nil
end

Public Instance Methods

add_event(event,every_step = false) click to toggle source

Adds an Event to the event loop

@param [Event] event The event @param [Boolean] every_step Automatically added for every step

# File lib/utilrb/event_loop.rb, line 642
def add_event(event,every_step = false)
    raise ArgumentError "cannot add event which is ignored." if event.ignore?
    if every_step
        @mutex.synchronize do
            @every_cylce_events << event
        end
    else
        @events << event
    end
    event
end
add_task(task) click to toggle source

Adds a task to the thread pool

@param [ThreadPool::Task] task The task.

# File lib/utilrb/event_loop.rb, line 657
def add_task(task)
    thread_pool << task
end
add_timer(timer) click to toggle source

Adds a timer to the event loop

@param [Timer] timer The timer.

# File lib/utilrb/event_loop.rb, line 631
def add_timer(timer)
    @mutex.synchronize do
        raise "timer #{timer}:#{timer.doc} was already added!" if @timers.include?(timer)
        @timers << timer
    end
end
async(work,*args,&callback) click to toggle source

Integrates a blocking operation call into the EventLoop like {Utilrb::EventLoop#defer} but has a more suitable syntax for deferring a method call

async method(:my_method) do |result,exception|
      if exception
              raise exception
      else
              puts result
      end
end

@param [#call] work The proc which will be deferred @yield [result] The callback @yield [result,exception] The callback @return [Utilrb::ThreadPool::Task] The thread pool task.

# File lib/utilrb/event_loop.rb, line 213
def async(work,*args,&callback)
    async_with_options(work,Hash.new,*args,&callback)
end
async_every(work,options=Hash.new,*args, &callback) click to toggle source

Integrates a blocking operation call like {Utilrb::EventLoop#async} but automatically re queues the call if period was passed and the task was finished by the worker thread. This means it will never re queue the call if the task blocks for ever and it will never simultaneously defer the call to more than one worker thread.

@param [Hash] options The options @option options [Float] :period The period @option options [Boolean] :start Starts the timer right away (default = true) @param [#call] work The proc which will be deferred @param (see defer) @option (see defer) @return [EventLoop::Timer] The thread pool task.

# File lib/utilrb/event_loop.rb, line 252
def async_every(work,options=Hash.new,*args, &callback)
    options, async_opt = Kernel.filter_options(options,:period,:start => true)
    period = options[:period]
    raise ArgumentError,"No period given" unless period
    task = nil
    every period ,options[:start] do
        if !task
            task = async_with_options(work,async_opt,*args,&callback)
        elsif task.finished?
            add_task task
        end
        task
    end
end
async_with_options(work,options=Hash.new,*args,&callback) click to toggle source

(see async) @param [Hash] options The options @option (see defer)

# File lib/utilrb/event_loop.rb, line 220
def async_with_options(work,options=Hash.new,*args,&callback)
    options[:callback] = callback
    defer(options,*args,&work)
end
backlog() click to toggle source

(see ThreadPool#backlog)

# File lib/utilrb/event_loop.rb, line 568
def backlog
    thread_pool.backlog
end
call(&block) click to toggle source
Calls the give block in the event loop thread. If the current thread
is the event loop thread it will execute it right a way and returns
the result of the code block call. Otherwise, it returns an handler to 
the Event which was queued.

@return [Event,Object]

# File lib/utilrb/event_loop.rb, line 375
def call(&block)
    if thread?
        block.call
    else
        once(&block)
    end
end
cancel_timer(timer) click to toggle source

Cancels the given timer if it is running otherwise it does nothing.

@param [Timer] timer The timer

# File lib/utilrb/event_loop.rb, line 492
def cancel_timer(timer)
    @mutex.synchronize do
        @timers.delete timer
    end
end
clear() click to toggle source

Clears all timers, events and errors

# File lib/utilrb/event_loop.rb, line 662
def clear
    thread_pool.clear

    @errors.clear
    @events.clear
    @mutex.synchronize do
        @every_cylce_events.clear
        @timers.clear
    end
end
clear_errors() click to toggle source

Clears all errors which occurred during the last step and are not marked as known If the errors were not cleared they are re raised the next time step is called.

# File lib/utilrb/event_loop.rb, line 675
def clear_errors
    @errors.clear
end
defer(options=Hash.new,*args,&block) click to toggle source

Integrates a blocking operation call into the EventLoop by executing it from a different thread. The given callback will be called from the EventLoop thread while processing its events after the call returned.

If the callback has an arity of 2 the exception will be passed to the callback as second parameter in an event of an error. The error is also passed to the error handlers of the even loop, but it will not be re raised if the error is marked as known

To overwrite an error the callback can return :ignore_error or a new instance of an error in an event of an error. In this case the error handlers of the event loop will not be called or called with the new error instance.

@example ignore a error callback = Proc.new do |r,e|

   if e
      :ignore_error
   else
      puts r
   end
end

defer({:callback => callback}) do

raise

end

@param [Hash] options The options @option (see ThreadPool::Task#initialize) @option options [Proc] :callback The callback @option options [class] :known_errors Known erros which will be rescued @option options [Proc] :on_error Callback which is called when an error occured

@param (see ThreadPool::Task#initialize) @return [ThreadPool::Task] The thread pool task.

# File lib/utilrb/event_loop.rb, line 302
def defer(options=Hash.new,*args,&block)
    options, task_options = Kernel.filter_options(options,{:callback => nil,:known_errors => [],:on_error => nil})
    callback = options[:callback]
    error_callback = options[:on_error]
    known_errors = Array(options[:known_errors])

    task = Utilrb::ThreadPool::Task.new(task_options,*args,&block)
    # ensures that user callback is called from main thread and not from worker threads
    if callback
        task.callback do |result,exception|
            once do
                if callback.arity == 1
                    callback.call result if !exception
                else
                    e = callback.call result,exception
                    #check if the error was overwritten in the
                    #case of an error
                    exception = if exception
                                    if e.is_a?(Symbol) && e == :ignore_error
                                        nil
                                    elsif e.is_a? Exception
                                        e
                                    else
                                        exception
                                    end
                                end
                end
                if exception
                    error_callback.call(exception) if error_callback
                    raises = !known_errors.any? {|error| exception.is_a?(error)}
                    handle_error(exception,raises)
                end
            end
        end
    else
        task.callback do |result,exception|
            if exception
                raises = !known_errors.find {|error| exception.is_a?(error)}
                once do
                    error_callback.call(exception) if error_callback
                    handle_error(exception,raises)
                end
            end
        end
    end
    @mutex.synchronize do
        @thread_pool << task
    end
    task
end
events?() click to toggle source

Returns true if events are queued.

@return [Boolean]

# File lib/utilrb/event_loop.rb, line 386
def events?
    !@events.empty? || !@errors.empty?
end
every(period,start=true,&block) click to toggle source

Adds a timer to the event loop which will execute the given code block with the given period from the event loop thread.

@param [Float] period The period of the timer in seconds @parma [Boolean] start Startet the timerright away. @yield The code block. @return [Utilrb::EventLoop::Timer]

# File lib/utilrb/event_loop.rb, line 398
def every(period,start=true,&block)
    timer = Timer.new(self,period,&block)
    timer.start if start # adds itself to the event loop
    timer
end
every_step(&block) click to toggle source

Executes the given block every step from the event loop thread.

@return [Event] The event

# File lib/utilrb/event_loop.rb, line 407
def every_step(&block)
    add_event Event.new(block),true
end
exec(period=0.05,&block) click to toggle source

Starts the event loop with the given period. If a code block is given it will be executed at the end of each step. This method will block until stop is called

@param [Float] period The period @yield The code block

# File lib/utilrb/event_loop.rb, line 516
def exec(period=0.05,&block)
    @stop = false
    reset_timers
    while !@stop
        last_step = Time.now
        step(last_step,&block)
        diff = (Time.now-last_step).to_f
        sleep(period-diff) if diff < period && !@stop
    end
end
handle_error(error,save = true) click to toggle source
# File lib/utilrb/event_loop.rb, line 679
def handle_error(error,save = true)
    call do
        on_error = @mutex.synchronize do
            @on_error.find_all{|key,e| error.is_a? key}.map(&:last).flatten
        end
        on_error.each do |handler|
            handler.call error
        end
        @errors << error if save == true
    end
end
on_error(error_class,&block) click to toggle source

Errors caught during event loop callbacks are forwarded to registered code blocks. The code block is called from the event loop thread.

@param @error_class The error class the block should be called for @yield [exception] The code block

# File lib/utilrb/event_loop.rb, line 417
def on_error(error_class,&block)
    @mutex.synchronize do
        @on_error[error_class] ||= []
        @on_error[error_class]  << block
    end
end
on_errors(*error_classes,&block) click to toggle source

Errors caught during event loop callbacks are forwarded to registered code blocks. The code blocks are called from the event loop thread.

@param @error_classes The error classes the block should be called for @yield [exception] The code block

# File lib/utilrb/event_loop.rb, line 430
def on_errors(*error_classes,&block)
    error_classes.flatten!
    error_classes.each do |error_class|
        on_error(error_class,&block)
    end
end
once(delay=nil,&block) click to toggle source

Executes the given block in the next step from the event loop thread. Returns a Timer object if a delay is set otherwise an handler to the Event which was queued.

@yield [] The code block. @return [Utilrb::EventLoop::Timer,Event]

# File lib/utilrb/event_loop.rb, line 359
def once(delay=nil,&block)
    raise ArgumentError "no block given" unless block
    if delay && delay > 0
        timer = Timer.new(self,delay,true,&block)
        timer.start(timer.period, false)
    else
        add_event(Event.new(block))
    end
end
reraise_error(error) click to toggle source
# File lib/utilrb/event_loop.rb, line 577
def reraise_error(error)
    raise error, error.message, (error.backtrace || []) + caller(1)
end
reset_timers(time = Time.now) click to toggle source

Resets all timers to fire not before their hole period is passed counting from the given point in time.

@param [Time] time The time

# File lib/utilrb/event_loop.rb, line 502
def reset_timers(time = Time.now)
    @mutex.synchronize do 
        @timers.each do |timer|
            timer.reset time
        end
    end
end
shutdown() click to toggle source

Shuts down the thread pool

# File lib/utilrb/event_loop.rb, line 573
def shutdown()
    thread_pool.shutdown()
end
step(time = Time.now,&block) click to toggle source

Handles all current events and timers. If a code block is given it will be executed at the end.

@param [Time] time The time the step is executed for. @yield The code block

# File lib/utilrb/event_loop.rb, line 586
def step(time = Time.now,&block)
    validate_thread
    reraise_error(@errors.shift) if !@errors.empty?

    #copy all work otherwise it would not be allowed to
    #call any event loop functions from a timer
    timers,call = @mutex.synchronize do
                            @every_cylce_events.delete_if(&:ignore?)
                            @every_cylce_events.each do |event|
                                add_event event
                            end

                            # check all timers
                            temp_timers = @timers.find_all do |timer|
                                timer.timeout?(time)
                            end
                            # delete single shot timer which elapsed
                            @timers -= temp_timers.find_all(&:single_shot?)
                            [temp_timers,block]
                        end

    # handle all current events but not the one which are added during processing.
    # Step is recursively be called if wait_for is used insight an event code block.
    # To make sure that all events and timer are processed in the right order
    # @number_of_events_to_process and a second timeout check is used.
    @number_of_events_to_process = [@events.size,@number_of_events_to_process].max
    while @number_of_events_to_process > 0
        event = @events.pop
        @number_of_events_to_process -= 1
        handle_errors{event.call} unless event.ignore?
    end
    timers.each do |timer|
        next if timer.stopped?
        handle_errors{timer.call(time)} if timer.timeout?(time)
    end
    handle_errors{call.call} if call
    reraise_error(@errors.shift) if !@errors.empty?
    
    #allow thread pool to take over
    Thread.pass
end
steps(period = 0.05,max_time=1.0,&block) click to toggle source

Steps with the given period until all worker thread are waiting for work

@param [Float] period Ther period @param (@see step)

# File lib/utilrb/event_loop.rb, line 555
def steps(period = 0.05,max_time=1.0,&block)
    start = Time.now
    begin
        last_step = Time.now
        step(last_step,&block)
        time = Time.now
        break if max_time && max_time <= (time-start).to_f
        diff = (time-last_step).to_f
        sleep(period-diff) if diff < period && !@stop
    end while (thread_pool.process? || events?)
end
stop() click to toggle source

Stops the EventLoop after [#exec] was called.

# File lib/utilrb/event_loop.rb, line 528
def stop
    @stop = true
end
sync(sync_key,*args,&block) click to toggle source

(see ThreadPool#sync)

# File lib/utilrb/event_loop.rb, line 226
def sync(sync_key,*args,&block)
    thread_pool.sync(sync_key,*args,&block)
end
sync_timeout(sync_key,timeout,*args,&block) click to toggle source

(see ThreadPool#sync_timeout)

# File lib/utilrb/event_loop.rb, line 231
def sync_timeout(sync_key,timeout,*args,&block)
    thread_pool.sync_timeout(sync_key,timeout,*args,&block)
end
thread=(thread) click to toggle source

Sets the event loop thread. By default it is set to the one the EventLoop was started from.

@param thread The thread

# File lib/utilrb/event_loop.rb, line 463
def thread=(thread)
    @mutex.synchronize do
        @thread = thread
    end
end
thread?() click to toggle source

Returns true if the current thread is the event loop thread.

@return [Boolean]

# File lib/utilrb/event_loop.rb, line 449
def thread?
    @mutex.synchronize do
        if Thread.current == @thread
            true
        else
            false
        end
    end
end
timer?(timer) click to toggle source

Returns true if the given timer is running.

@param [Timer] timer The timer. @return [Boolean]

# File lib/utilrb/event_loop.rb, line 473
def timer?(timer)
    @mutex.synchronize do
        @timers.include? timer
    end
end
timers() click to toggle source

Returns all currently running timers.

@return Array<Timer>

# File lib/utilrb/event_loop.rb, line 482
def timers
    @mutex.synchronize do
        @timers.dup
    end
end
validate_thread() click to toggle source

Raises if the current thread is not the event loop thread (by default the one the event loop was started from).

@raise [RuntimeError]

# File lib/utilrb/event_loop.rb, line 441
def validate_thread
    raise "current thread is not the event loop thread" if !thread?
end
wait_for(period=0.05,timeout=nil,&block) click to toggle source

Steps with the given period until the given block returns true.

@param [Float] period The period @param [Float] timeout The timeout in seconds @yieldreturn [Boolean]

# File lib/utilrb/event_loop.rb, line 538
def wait_for(period=0.05,timeout=nil,&block)
    start = Time.now
    old_stop = @stop
    exec period do
        stop if block.call
        if timeout && timeout <= (Time.now-start).to_f
            raise RuntimeError,"Timeout during wait_for"
        end
    end
    @stop = old_stop
end

Private Instance Methods

handle_errors(&block) click to toggle source

Calls the given block and rescues all errors which can be handled by the added error handler. If an error cannot be handled it is stored and re raised after all events and timers are processed. If more than one error occurred which cannot be handled they are stored until the next step is called and re raised until all errors are processed.

@info This method must be called from the event loop thread, otherwise

all error handlers would be called from the wrong thread

@yield The code block. @see error_handler

# File lib/utilrb/event_loop.rb, line 703
def handle_errors(&block)
    block.call
rescue Exception => e
    handle_error(e,true)
end