class Rutema::Dispatcher
The Rutema::Dispatcher
functions as a demultiplexer between Rutema::Engine
and the various reporters.
In stream mode the incoming queue is popped periodically and the messages are destributed to the queues of any subscribed event reporters.
By default this includes Rutema::Reporters::Collector
which is then used at the end of a run to provide the collected data to all registered block mode reporters
Constants
- INTERVAL
The interval between queue operations
Public Class Methods
new(queue,configuration)
click to toggle source
# File lib/rutema/core/engine.rb, line 158 def initialize queue,configuration @queue = queue @queues = {} @streaming_reporters=[] @block_reporters=[] @collector=Rutema::Reporters::Collector.new(nil,self) if configuration.reporters instances=configuration.reporters.values.map{|v| instantiate_reporter(v,configuration) if v[:class] != Reporters::Summary}.compact @streaming_reporters,_=instances.partition{|rep| rep.respond_to?(:update)} @block_reporters,_=instances.partition{|rep| rep.respond_to?(:report)} end @streaming_reporters<<@collector @configuration=configuration end
Public Instance Methods
exit()
click to toggle source
# File lib/rutema/core/engine.rb, line 195 def exit puts "Exiting main dispatcher" if $DEBUG if @thread flush @streaming_reporters.each {|r| r.exit} Thread.kill(@thread) end end
report(specs)
click to toggle source
# File lib/rutema/core/engine.rb, line 189 def report specs @block_reporters.each do |r| r.report(specs,@collector.states,@collector.errors) end Reporters::Summary.new(@configuration,self).report(specs,@collector.states,@collector.errors) end
run!()
click to toggle source
# File lib/rutema/core/engine.rb, line 178 def run! puts "Running #{@streaming_reporters.size} streaming reporters" if $DEBUG @streaming_reporters.each {|r| r.run!} @thread=Thread.new do while true do dispatch() sleep INTERVAL end end end
subscribe(identifier)
click to toggle source
Call this to establish a queue with the given identifier
# File lib/rutema/core/engine.rb, line 173 def subscribe identifier @queues[identifier]=Queue.new return @queues[identifier] end
Private Instance Methods
dispatch()
click to toggle source
# File lib/rutema/core/engine.rb, line 220 def dispatch if @queue.size>0 data=@queue.pop @queues.each{ |i,q| q.push(data) } if data end end
flush()
click to toggle source
# File lib/rutema/core/engine.rb, line 204 def flush puts "Flushing queues" if $DEBUG if @thread while @queue.size>0 do dispatch() sleep INTERVAL end end end
instantiate_reporter(definition,configuration)
click to toggle source
# File lib/rutema/core/engine.rb, line 213 def instantiate_reporter definition,configuration if definition[:class] klass=definition[:class] return klass.new(configuration,self) end return nil end