class Arborist::MonitorRunner
An event-driven runner for Arborist::Monitors.
Constants
- QUEUE_SIGS
Signals the runner handles
- THREAD_CLEANUP_INTERVAL
Number of seconds between thread cleanup
Attributes
The Arborist::Client
that will provide the message packing and unpacking
The ZMQ::Handler subclass that handles all async IO
The Array of loaded Arborist::Monitors the runner should run.
The reactor (a ZMQ::Loop) the runner uses to drive everything
The Queue of pending requests, keyed by the callback that should be called with the results.
A hash of monitor object -> thread used to contain and track running monitor threads.
Public Class Methods
Create a new Arborist::MonitorRunner
# File lib/arborist/monitor_runner.rb, line 33 def initialize @monitors = [] @handler = nil @reactor = CZTop::Reactor.new @client = Arborist::Client.new @runner_threads = {} @request_queue = {} end
Public Instance Methods
Create a repeating ZMQ::Timer that will run the specified monitor on its interval.
# File lib/arborist/monitor_runner.rb, line 241 def add_interval_timer_for( monitor ) interval = monitor.interval self.log.info "Creating timer for %p" % [ monitor ] return self.reactor.add_periodic_timer( interval ) do unless self.runner_threads.key?( monitor ) self.run_monitor( monitor ) end end end
Create a one-shot ZMQ::Timer that will register the interval timer for the specified monitor
after a random number of seconds no greater than its splay.
# File lib/arborist/monitor_runner.rb, line 255 def add_splay_timer_for( monitor ) delay = rand( monitor.splay ) self.log.debug "Splaying registration of %p for %ds" % [ monitor, delay ] self.reactor.add_oneshot_timer( delay ) do self.add_interval_timer_for( monitor ) end end
Set up a timer to clean up monitor threads.
# File lib/arborist/monitor_runner.rb, line 266 def add_thread_cleanup_timer self.log.debug "Starting thread cleanup timer for %p." % [ self.runner_threads ] self.reactor.add_periodic_timer( THREAD_CLEANUP_INTERVAL ) do self.cleanup_monitor_threads end end
Register a timer for the specified monitor
.
# File lib/arborist/monitor_runner.rb, line 231 def add_timer_for( monitor ) if monitor.splay.nonzero? self.add_splay_timer_for( monitor ) else self.add_interval_timer_for( monitor ) end end
Clean up any monitor runner threads that are dead.
# File lib/arborist/monitor_runner.rb, line 277 def cleanup_monitor_threads self.runner_threads.values.reject( &:alive? ).each do |thr| monitor = self.runner_threads.key( thr ) self.runner_threads.delete( monitor ) begin thr.join rescue => err self.log.error "%p while running %s: %s" % [ err.class, thr[:monitor_desc], err.message ] end end end
Reactor callback – handle the client's socket becoming writable.
# File lib/arborist/monitor_runner.rb, line 110 def handle_io_event( event ) if event.writable? if (( pair = self.request_queue.shift )) callback, request = *pair res = self.client.send_tree_api_request( request ) callback.call( res ) end self.unregister if self.request_queue.empty? else raise "Unexpected %p on the tree API socket" % [ event ] end end
Load monitors from the specified enumerator
.
# File lib/arborist/monitor_runner.rb, line 74 def load_monitors( enumerator ) self.monitors.concat( enumerator.to_a ) end
Add the specified event
to the queue to be published to the console event socket
# File lib/arborist/monitor_runner.rb, line 203 def queue_request( request, &callback ) self.request_queue[ callback ] = request self.register end
Register the handler's pollitem as being ready to write if it isn't already.
# File lib/arborist/monitor_runner.rb, line 216 def register # self.log.debug "Registering for writing." self.reactor.enable_events( self.client.tree_api, :write ) unless self.registered? end
Returns true
if the runner's client socket is currently registered for writing.
# File lib/arborist/monitor_runner.rb, line 210 def registered? return self.reactor.event_enabled?( self.client.tree_api, :write ) end
Restart the runner
# File lib/arborist/monitor_runner.rb, line 95 def restart # :TODO: Kill any running monitor children, cancel monitor timers, and reload # monitors from the monitor enumerator raise NotImplementedError end
Run the specified monitors
# File lib/arborist/monitor_runner.rb, line 80 def run self.monitors.each do |mon| self.add_timer_for( mon ) end self.add_thread_cleanup_timer self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do self.reactor.register( self.client.tree_api, :write, &self.method(:handle_io_event) ) self.reactor.start_polling( ignore_interrupts: true ) end end
Update nodes with the results of a monitor's run.
# File lib/arborist/monitor_runner.rb, line 127 def run_monitor( monitor ) positive = monitor.positive_criteria negative = monitor.negative_criteria exclude_down = monitor.exclude_down? props = monitor.node_properties self.search( positive, exclude_down, props, negative ) do |nodes| self.log.info "Running %p monitor for %d node(s)" % [ monitor.description, nodes.length ] unless nodes.empty? self.runner_threads[ monitor ] = Thread.new do Thread.current[:monitor_desc] = monitor.description results = self.run_monitor_safely( monitor, nodes ) self.log.debug " updating with results: %p" % [ results ] self.update( results, monitor.key ) do self.log.debug "Updated %d via the '%s' monitor" % [ results.length, monitor.description ] end end self.log.debug "THREAD: Started %p for %p" % [ self.runner_threads[monitor], monitor ] self.log.debug "THREAD: Runner threads have: %p" % [ self.runner_threads.to_a ] end end end
Exec monitor
against the provided nodes
hash, treating runtime exceptions as an error condition. Returns an update hash, keyed by node identifier.
# File lib/arborist/monitor_runner.rb, line 161 def run_monitor_safely( monitor, nodes ) results = begin monitor.run( nodes ) rescue => err errmsg = "Exception while running %p monitor: %s: %s" % [ monitor.description, err.class.name, err.message ] self.log.error "%s\n%s" % [ errmsg, err.backtrace.join("\n ") ] nodes.keys.each_with_object({}) do |id, node_results| node_results[id] = { error: errmsg } end end return results end
Create a search request using the runner's client, then queue the request up with the specified block
as the callback.
# File lib/arborist/monitor_runner.rb, line 182 def search( criteria, exclude_down, properties, negative={}, &block ) search = self.client.make_search_request( criteria, exclude_down: exclude_down, properties: properties, exclude: negative ) self.queue_request( search, &block ) end
Stop the runner.
# File lib/arborist/monitor_runner.rb, line 103 def stop self.log.info "Stopping the runner." self.reactor.stop_polling end
Unregister the handler's pollitem from the reactor when there's nothing ready to write.
# File lib/arborist/monitor_runner.rb, line 224 def unregister # self.log.debug "Unregistering for writing." self.reactor.disable_events( self.client.tree_api, :write ) if self.registered? end
Create an update request using the runner's client, then queue the request up with the specified block
as the callback.
# File lib/arborist/monitor_runner.rb, line 194 def update( nodemap, monitor_key, &block ) return if nodemap.empty? update = self.client.make_update_request( nodemap, monitor_key: monitor_key ) self.queue_request( update, &block ) end
Signal Handling
↑ topPublic Instance Methods
Handle signals.
# File lib/arborist/monitor_runner.rb, line 299 def handle_signal( sig ) self.log.debug "Handling signal %s" % [ sig ] case sig when :INT, :TERM self.on_termination_signal( sig ) when :HUP self.on_hangup_signal( sig ) when :USR1 self.on_user1_signal( sig ) else self.log.warn "Unhandled signal %s" % [ sig ] end end
Handle a hangup by restarting the runner.
# File lib/arborist/monitor_runner.rb, line 328 def on_hangup_signal( signo ) self.log.warn "Hangup (%p)" % [ signo ] self.restart end
Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to on_interrupt_signal
.
# File lib/arborist/monitor_runner.rb, line 320 def on_termination_signal( signo ) self.log.warn "Terminated (%p)" % [ signo ] self.stop end