class Rutema::Dispatcher

The Rutema::Dispatcher functions as a demultiplexer between Rutema::Engine and the various reporters.

In stream mode the incoming queue is popped periodically and the messages are destributed to the queues of any subscribed event reporters.

By default this includes Rutema::Reporters::Collector which is then used at the end of a run to provide the collected data to all registered block mode reporters

Constants

INTERVAL

The interval between queue operations

Public Class Methods

new(queue,configuration) click to toggle source
# File lib/rutema/core/engine.rb, line 158
def initialize queue,configuration
  @queue = queue
  @queues = {}
  @streaming_reporters=[]
  @block_reporters=[]
  @collector=Rutema::Reporters::Collector.new(nil,self)
  if configuration.reporters
    instances=configuration.reporters.values.map{|v| instantiate_reporter(v,configuration) if v[:class] != Reporters::Summary}.compact
    @streaming_reporters,_=instances.partition{|rep| rep.respond_to?(:update)}
    @block_reporters,_=instances.partition{|rep| rep.respond_to?(:report)}
  end
  @streaming_reporters<<@collector
  @configuration=configuration
end

Public Instance Methods

exit() click to toggle source
# File lib/rutema/core/engine.rb, line 195
def exit
  puts "Exiting main dispatcher" if $DEBUG
  if @thread
    flush
    @streaming_reporters.each {|r| r.exit}
    Thread.kill(@thread)
  end
end
report(specs) click to toggle source
# File lib/rutema/core/engine.rb, line 189
def report specs
  @block_reporters.each do |r|
    r.report(specs,@collector.states,@collector.errors)
  end
  Reporters::Summary.new(@configuration,self).report(specs,@collector.states,@collector.errors)
end
run!() click to toggle source
# File lib/rutema/core/engine.rb, line 178
def run!
  puts "Running #{@streaming_reporters.size} streaming reporters" if $DEBUG
  @streaming_reporters.each {|r| r.run!}
  @thread=Thread.new do
    while true do
      dispatch()
      sleep INTERVAL
    end
  end
end
subscribe(identifier) click to toggle source

Call this to establish a queue with the given identifier

# File lib/rutema/core/engine.rb, line 173
def subscribe identifier
  @queues[identifier]=Queue.new
  return @queues[identifier]
end

Private Instance Methods

dispatch() click to toggle source
# File lib/rutema/core/engine.rb, line 220
def dispatch
  if @queue.size>0
    data=@queue.pop
    @queues.each{ |i,q| q.push(data) } if data
  end
end
flush() click to toggle source
# File lib/rutema/core/engine.rb, line 204
def flush
  puts "Flushing queues" if $DEBUG
  if @thread
    while @queue.size>0 do
      dispatch()
      sleep INTERVAL
    end
  end
end
instantiate_reporter(definition,configuration) click to toggle source
# File lib/rutema/core/engine.rb, line 213
def instantiate_reporter definition,configuration
  if definition[:class]
    klass=definition[:class]
    return klass.new(configuration,self)
  end
  return nil
end