class Mmtrix::Agent::EventLoop

Public Class Methods

new() click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 57
def initialize
  @self_pipe_rd, @self_pipe_wr = IO.pipe
  @event_queue = Queue.new
  @stopped     = false
  @timers      = {}

  @subscriptions = Hash.new { |h,k| h[k] = [] }
  @subscriptions[:__add_timer] << Proc.new { |t| set_timer(t) }
  @subscriptions[:__add_event] << Proc.new { |e, blk| @subscriptions[e] << blk }
end

Public Instance Methods

dispatch_event(event, args) click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 142
def dispatch_event(event, args)
  Mmtrix::Agent.logger.debug("EventLoop: Dispatching event '#{event}' with #{@subscriptions[event].size} callback(s).")

  errors = []
  @subscriptions[event].each do |s|
    begin
      s.call(*args)
    rescue Mmtrix::Agent::ForceRestartException, Mmtrix::Agent::ForceDisconnectException
      raise
    rescue => e
      errors << e
    end
  end

  if !errors.empty?
    ::Mmtrix::Agent.logger.error "#{errors.size} error(s) running task for event '#{event}' in Agent Event Loop:", *errors
  end
end
fire(event, *args) click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 169
def fire(event, *args)
  @event_queue << [event, args]
  wakeup
end
fire_after(interval, event) click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 179
def fire_after(interval, event)
  ::Mmtrix::Agent.logger.debug "Firing event #{event} after #{interval} seconds."
  fire(:__add_timer, Timer.new(interval, event, false))
end
fire_every(interval, event) click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 174
def fire_every(interval, event)
  ::Mmtrix::Agent.logger.debug "Firing event #{event} every #{interval} seconds."
  fire(:__add_timer, Timer.new(interval, event, true))
end
fire_timer(timer) click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 131
def fire_timer(timer)
  if timer.due?
    @event_queue << [timer.event]
    timer.set_fired_time
  end
end
fire_timers() click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 125
def fire_timers
  @timers.each do |event, timer|
    fire_timer(timer)
  end
end
next_timeout() click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 81
def next_timeout
  return nil if @timers.empty?
  timeout = @timers.values.map(&:next_fire_time).min - Time.now
  timeout < 0 ? 0 : timeout
end
on(event, &blk) click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 165
def on(event, &blk)
  fire(:__add_event, event, blk)
end
prune_timers() click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 138
def prune_timers
  @timers.delete_if { |e, t| t.finished? }
end
reschedule_timer_for_event(e) click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 161
def reschedule_timer_for_event(e)
  @timers[e].reschedule if @timers[e]
end
run() click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 96
def run
  ::Mmtrix::Agent.logger.debug "Running event loop"
  while !stopped?
    run_once
  end
end
run_once(nonblock=false) click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 103
def run_once(nonblock=false)
  wait_to_run(nonblock)

  prune_timers
  fire_timers

  until @event_queue.empty?
    evt, args = @event_queue.pop
    dispatch_event(evt, args)
    reschedule_timer_for_event(evt)
  end
end
set_timer(timer) click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 68
def set_timer(timer)
  existing_timer = @timers[timer.event]

  if existing_timer
    elapsed_interval = Time.now - existing_timer.last_interval_start
    timer.advance(elapsed_interval)
  end

  @timers[timer.event] = timer

  fire_timer(timer)
end
stop() click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 91
def stop
  @stopped = true
  wakeup
end
stopped?() click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 87
def stopped?
  @stopped
end
wait_to_run(nonblock) click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 116
def wait_to_run(nonblock)
  timeout = nonblock ? 0 : next_timeout
  ready = IO.select([@self_pipe_rd], nil, nil, timeout)

  if ready && ready[0] && ready[0][0] && ready[0][0] == @self_pipe_rd
    @self_pipe_rd.read(1)
  end
end
wakeup() click to toggle source
# File lib/mmtrix/agent/event_loop.rb, line 184
def wakeup
  begin
    @self_pipe_wr.write_nonblock '.'
  rescue Errno::EAGAIN
    ::Mmtrix::Agent.logger.debug "Failed to wakeup event loop"
  end
end