class Arborist::Manager
The main Arborist
process – responsible for coordinating all other activity.
Constants
- QUEUE_SIGS
Signals the manager responds to
- VALID_TREEAPI_ACTIONS
Array of actions supported by the Tree API
Attributes
The Timers::Timer that periodically checkpoints the manager's state (if it's configured to do so)
The queue of pending Event API events
The ZeroMQ PUB socket that publishes events for the Event API
The Timers::Timer that periodically publishes a heartbeat event
The maximum amount of time to wait for pending events to be delivered during shutdown, in milliseconds.
The Hash of all loaded Nodes, keyed by their identifier
The CZTop::Reactor that runs the event loop
The root node of the tree.
A unique string used to identify different runs of the Manager
The time at which the manager began running.
The Hash of all Subscriptions, keyed by their subscription ID
The ZeroMQ socket REP socket that handles Tree API requests
Public Class Methods
Create a new Arborist::Manager
.
# File lib/arborist/manager.rb, line 97 def initialize @run_id = SecureRandom.hex( 16 ) @root = Arborist::Node.create( :root ) @nodes = { '_' => @root } @subscriptions = {} @tree_built = false @start_time = nil @checkpoint_timer = nil @linger = self.class.linger self.log.info "Linger set to %p" % [ @linger ] @reactor = CZTop::Reactor.new @tree_socket = nil @event_socket = nil @event_queue = [] @heartbeat_timer = nil @checkpoint_timer = nil end
Public Instance Methods
Flag for marking when the tree is built successfully the first time
# File lib/arborist/manager.rb, line 163 attr_predicate_accessor :tree_built
Node state saving/reloading
↑ topPublic Instance Methods
Cancel the timer that saves tree snapshots.
# File lib/arborist/manager.rb, line 372 def cancel_checkpoint_timer self.reactor.remove_timer( self.checkpoint_timer ) end
Cancel the timer that publishes heartbeat events.
# File lib/arborist/manager.rb, line 340 def cancel_heartbeat_timer self.reactor.remove_timer( self.heartbeat_timer ) end
Register a periodic timer that will save a snapshot of the node tree's state to the state file on a configured interval if one is configured.
# File lib/arborist/manager.rb, line 353 def register_checkpoint_timer unless self.class.state_file self.log.info "No state file configured; skipping checkpoint timer setup." return nil end interval = self.class.checkpoint_frequency unless interval && interval.nonzero? self.log.info "Checkpoint frequency is %p; skipping checkpoint timer setup." % [ interval ] return nil end self.log.info "Setting up node state checkpoint every %0.3fs" % [ interval ] @checkpoint_timer = self.reactor.add_periodic_timer( interval ) do self.save_node_states end end
Register a periodic timer that will publish a heartbeat event at a configurable interval.
# File lib/arborist/manager.rb, line 329 def register_heartbeat_timer interval = self.class.heartbeat_frequency self.log.info "Setting up to heartbeat every %ds" % [ interval ] @heartbeat_timer = self.reactor.add_periodic_timer( interval ) do self.publish_heartbeat_event end end
Attempt to restore the state of loaded node from the configured state file. Returns true if it succeeded, or false if a state file wasn't configured, doesn't exist, isn't readable, or couldn't be unmarshalled.
# File lib/arborist/manager.rb, line 305 def restore_node_states path = self.class.state_file or return false return false unless path.readable? self.log.info "Restoring node state from %s" % [ path ] nodes = Marshal.load( path.open('r:binary') ) nodes.each do |identifier, saved_node| self.log.debug "Loaded node: %p" % [ identifier ] if (( current_node = self.nodes[ identifier ] )) self.log.debug "Restoring state of the %p node." % [ identifier ] current_node.restore( saved_node ) else self.log.info "Not restoring state for the %s node: not present in the loaded tree." % [ identifier ] end end return true end
Resume the timer that saves tree snapshots.
# File lib/arborist/manager.rb, line 378 def resume_checkpoint_timer self.reactor.resume_timer( self.checkpoint_timer ) end
Resume the timer that publishes heartbeat events.
# File lib/arborist/manager.rb, line 346 def resume_heartbeat_timer self.reactor.resume_timer( self.heartbeat_timer ) end
Write out the state of all the manager's nodes to the state_file if one is configured.
# File lib/arborist/manager.rb, line 279 def save_node_states start_time = Time.now path = self.class.state_file or return self.log.info "Saving current node state to %s" % [ path ] tmpfile = Tempfile.create( [path.basename.to_s.sub(path.extname, ''), path.extname], path.dirname.to_s, encoding: 'binary' ) Marshal.dump( self.nodes, tmpfile ) tmpfile.close File.rename( tmpfile.path, path.to_s ) self.log.debug "Saved state file in %0.1f seconds." % [ Time.now - start_time ] rescue SystemCallError => err self.log.error "%p while saving node state: %s" % [ err.class, err.message ] ensure File.unlink( tmpfile.path ) if tmpfile && File.exist?( tmpfile.path ) end
Signal Handling
↑ topPublic Instance Methods
Handle signals.
# File lib/arborist/manager.rb, line 390 def handle_signal( sig ) self.log.debug "Handling signal %s" % [ sig ] case sig when :INT, :TERM self.on_termination_signal( sig ) when :HUP self.on_hangup_signal( sig ) when :USR1 self.on_user1_signal( sig ) else self.log.warn "Unhandled signal %s" % [ sig ] end end
Handle a HUP signal. The default is to restart the handler.
# File lib/arborist/manager.rb, line 419 def on_hangup_signal( signo ) self.log.warn "Hangup (%p)" % [ signo ] self.restart end
Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to on_interrupt_signal
.
# File lib/arborist/manager.rb, line 411 def on_termination_signal( signo ) self.log.warn "Terminated (%p)" % [ signo ] self.stop end
Handle a USR1 signal. Writes a message to the log.
# File lib/arborist/manager.rb, line 426 def on_user1_signal( signo ) self.log.info "Checkpoint: User signal." self.save_node_states end
Startup/Shutdown
↑ topPublic Instance Methods
Register the Manager's timers.
# File lib/arborist/manager.rb, line 229 def cancel_timers self.cancel_heartbeat_timer self.cancel_checkpoint_timer end
Return a human-readable representation of the Manager
suitable for debugging.
# File lib/arborist/manager.rb, line 263 def inspect return "#<%p:%#x {runid: %s} %d nodes>" % [ self.class, self.object_id * 2, self.run_id, self.nodes.length, ] end
Register the Manager's timers.
# File lib/arborist/manager.rb, line 222 def register_timers self.register_checkpoint_timer self.register_heartbeat_timer end
Restart the manager
# File lib/arborist/manager.rb, line 250 def restart raise NotImplementedError end
Setup sockets and start the event loop.
# File lib/arborist/manager.rb, line 185 def run self.log.info "Getting ready to start the manager." self.setup_sockets self.register_timers self.with_signal_handler( reactor, *QUEUE_SIGS ) do self.start_accepting_requests end ensure self.shutdown_sockets self.save_node_states end
Returns true if the Manager
is running.
# File lib/arborist/manager.rb, line 214 def running? return self.reactor && self.event_socket && self.reactor.registered?( self.event_socket ) end
Create the sockets used by the manager and bind them to the appropriate endpoints.
# File lib/arborist/manager.rb, line 200 def setup_sockets self.setup_tree_socket self.setup_event_socket end
Shut down the sockets used by the manager.
# File lib/arborist/manager.rb, line 207 def shutdown_sockets self.shutdown_tree_socket self.shutdown_event_socket end
Start a loop, accepting a request and handling it.
# File lib/arborist/manager.rb, line 236 def start_accepting_requests self.log.debug "Starting the main loop" self.start_time = Time.now self.reactor.register( self.tree_socket, :read, &self.method(:on_tree_socket_event) ) self.reactor.register( self.event_socket, :write, &self.method(:on_event_socket_event) ) self.log.debug "Manager running." return self.reactor.start_polling( ignore_interrupts: true ) end
Stop the manager.
# File lib/arborist/manager.rb, line 256 def stop self.log.info "Stopping the manager." self.reactor.stop_polling end
Tree API
↑ topPublic Instance Methods
Add the specified node
to the Manager
.
# File lib/arborist/manager.rb, line 481 def add_node( node ) identifier = node.identifier raise Arborist::NodeError, "Node %p already present." % [ identifier ] if self.nodes[ identifier ] self.nodes[ identifier ] = node if self.tree_built? self.link_node( node ) self.publish_system_event( 'node_added', node: identifier ) end end
Yield each node in a depth-first traversal of the manager's tree to the specified block
, or return an Enumerator if no block is given.
# File lib/arborist/manager.rb, line 914 def all_nodes( &block ) iter = self.enumerator_for( self.root ) return iter.each( &block ) if block return iter end
Return the Array of all nodes above the specified node
.
# File lib/arborist/manager.rb, line 970 def ancestors_for( node ) parent_id = node.parent or return [] parent = self.nodes[ parent_id ] return [ parent ] + self.ancestors_for( parent ) end
Build the tree out of all the loaded nodes.
# File lib/arborist/manager.rb, line 448 def build_tree self.log.info "Building tree from %d loaded nodes." % [ self.nodes.length ] # Build primary tree structure self.nodes.each_value do |node| next if node.operational? self.link_node_to_parent( node ) end self.tree_built = true # Set up secondary dependencies self.nodes.each_value do |node| node.register_secondary_dependencies( self ) end self.restore_node_states end
Create a subscription that publishes to the Manager's event publisher for the node with the specified identifier
and event_pattern
, using the given criteria
when considering an event.
# File lib/arborist/manager.rb, line 1083 def create_subscription( identifier, event_pattern, criteria, negative_criteria={} ) sub = Arborist::Subscription.new( event_pattern, criteria, negative_criteria ) do |*args| self.publish( *args ) end self.subscribe( identifier, sub ) return sub end
Return a depth
limited enumerator for the specified start_node
.
# File lib/arborist/manager.rb, line 945 def depth_limited_enumerator_for( start_node, depth, &filter ) return Enumerator.new do |yielder| traverse = ->( node, current_depth ) do self.log.debug "Enumerating nodes from %s at depth: %p" % [ node.identifier, current_depth ] if !filter || filter.call( node ) yielder.yield( node ) node.each do |child| traverse[ child, current_depth - 1 ] end if current_depth > 0 end end traverse.call( start_node, depth ) end end
Return an Array of all nodes below the specified node
.
# File lib/arborist/manager.rb, line 964 def descendants_for( node ) return self.enumerator_for( node ).to_a end
Handle the specified raw_request
and return a response.
# File lib/arborist/manager.rb, line 614 def dispatch_request( raw_request ) raise "Manager is shutting down" unless self.running? header, body = Arborist::TreeAPI.decode( raw_request ) handler = self.lookup_tree_request_action( header ) return handler.call( header, body ) rescue => err self.log.error "%p: %s" % [ err.class, err.message ] err.backtrace.each {|frame| self.log.debug " #{frame}" } errtype = case err when Arborist::MessageError, Arborist::ConfigError, Arborist::NodeError 'client' else 'server' end return Arborist::TreeAPI.error_response( errtype, err.message ) end
Return an enumerator for the specified start_node
.
# File lib/arborist/manager.rb, line 931 def enumerator_for( start_node, &filter ) return Enumerator.new do |yielder| traverse = ->( node ) do if !filter || filter.call( node ) yielder.yield( node ) node.each( &traverse ) end end traverse.call( start_node ) end end
Traverse the node tree and return the specified return_values
from any nodes which match the given filter
, skipping downed nodes and all their children if exclude_down
is set. If return_values
is set to nil
, then all values from the node will be returned.
# File lib/arborist/manager.rb, line 542 def find_matching_node_states( filter, return_values, exclude_down=false, negative_filter={} ) nodes_iter = if exclude_down self.reachable_nodes else self.all_nodes end states = nodes_iter. select {|node| node.matches?(filter) }. reject {|node| !negative_filter.empty? && node.matches?(negative_filter) }. each_with_object( {} ) do |node, hash| hash[ node.identifier ] = node.fetch_values( return_values ) end return states end
Acknowledge a node
# File lib/arborist/manager.rb, line 870 def handle_ack_request( header, body ) self.log.info "ACK: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for ACK.' ) node = self.nodes[ identifier ] or return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] ) self.log.debug "Acking the %s node: %p" % [ identifier, body ] body = symbolify_keys( body ) events = node.acknowledge( **body ) self.propagate_events( node, events ) return Arborist::TreeAPI.successful_response( nil ) end
Return a response to the `deps` action.
# File lib/arborist/manager.rb, line 728 def handle_deps_request( header, body ) self.log.info "DEPS: %p" % [ header ] from = header['from'] || '_' deps = self.merge_dependencies_from( from ) deps.delete( from ) return Arborist::TreeAPI.successful_response({ deps: deps.to_a }) rescue Arborist::ClientError => err return Arborist::TreeAPI.error_response( 'client', err.message ) end
Return a repsonse to the `fetch` action.
# File lib/arborist/manager.rb, line 701 def handle_fetch_request( header, body ) self.log.info "FETCH: %p" % [ header ] from = header['from'] || '_' depth = header['depth'] tree = header['tree'] start_node = self.nodes[ from ] or return Arborist::TreeAPI.error_response( 'client', "No such node %s." % [from] ) self.log.debug " Listing nodes under %p" % [ start_node ] if tree iter = [ start_node.to_h(depth: (depth || -1)) ] elsif depth self.log.debug " depth limited to %d" % [ depth ] iter = self.depth_limited_enumerator_for( start_node, depth ) else self.log.debug " no depth limit" iter = self.enumerator_for( start_node ) end data = iter.map( &:to_h ) self.log.debug " got data for %d nodes" % [ data.length ] return Arborist::TreeAPI.successful_response( data ) end
Add a node
# File lib/arborist/manager.rb, line 811 def handle_graft_request( header, body ) self.log.info "GRAFT: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for GRAFT.' ) if self.nodes[ identifier ] return Arborist::TreeAPI.error_response( 'client', "Node %p already exists." % [identifier] ) end type = header[ 'type' ] or return Arborist::TreeAPI.error_response( 'client', 'No type specified for GRAFT.' ) parent = header[ 'parent' ] || '_' parent_node = self.nodes[ parent ] or return Arborist::TreeAPI.error_response( 'client', 'No parent node found for %s.' % [parent] ) self.log.debug "Grafting a new %s node under %p" % [ type, parent_node ] # If the parent has a factory method for the node type, use it, otherwise # use the Pluggability factory node = if parent_node.respond_to?( type ) parent_node.method( type ).call( identifier, body ) else body.merge!( parent: parent ) Arborist::Node.create( type, identifier, body ) end self.add_node( node ) return Arborist::TreeAPI.successful_response( node ? {identifier: node.identifier} : nil ) end
Modify a node's operational attributes
# File lib/arborist/manager.rb, line 845 def handle_modify_request( header, body ) self.log.info "MODIFY: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for MODIFY.' ) return Arborist::TreeAPI.error_response( 'client', "Unable to MODIFY root node." ) if identifier == '_' node = self.nodes[ identifier ] or return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] ) self.log.debug "Modifying operational attributes of the %s node: %p" % [ identifier, body ] if new_parent_identifier = body.delete( 'parent' ) old_parent = self.nodes[ node.parent ] new_parent = self.nodes[ new_parent_identifier ] or return Arborist::TreeAPI.error_response( 'client', "No such parent node: %p" % [new_parent_identifier] ) node.reparent( old_parent, new_parent ) end node.modify( body ) return Arborist::TreeAPI.successful_response( nil ) end
Remove a node and its children.
# File lib/arborist/manager.rb, line 799 def handle_prune_request( header, body ) self.log.info "PRUNE: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for PRUNE.' ) node = self.remove_node( identifier ) return Arborist::TreeAPI.successful_response( node ? node.to_h : nil ) end
Return a response to the 'search' action.
# File lib/arborist/manager.rb, line 762 def handle_search_request( header, body ) self.log.info "SEARCH: %p" % [ header ] exclude_down = header['exclude_down'] values = if header.key?( 'return' ) header['return'] || [] else nil end body = [ body ] unless body.is_a?( Array ) positive = body.shift negative = body.shift || {} states = self.find_matching_node_states( positive, values, exclude_down, negative ) return Arborist::TreeAPI.successful_response( states ) end
Return a response to the `status` action.
# File lib/arborist/manager.rb, line 655 def handle_status_request( header, body ) self.log.info "STATUS: %p" % [ header ] return Arborist::TreeAPI.successful_response( server_version: Arborist::VERSION, state: self.running? ? 'running' : 'not running', uptime: self.uptime, nodecount: self.nodecount ) end
Return a response to the `subscribe` action.
# File lib/arborist/manager.rb, line 667 def handle_subscribe_request( header, body ) self.log.info "SUBSCRIBE: %p" % [ header ] event_type = header[ 'event_type' ] node_identifier = header[ 'identifier' ] body = [ body ] unless body.is_a?( Array ) positive = body.shift negative = body.shift || {} subscription = self.create_subscription( node_identifier, event_type, positive, negative ) self.log.info "Subscription to %s events at or under %s: %p" % [ event_type || 'all', node_identifier || 'the root node', subscription ] return Arborist::TreeAPI.successful_response( id: subscription.id ) end
Un-acknowledge a node
# File lib/arborist/manager.rb, line 889 def handle_unack_request( header, body ) self.log.info "UNACK: %p" % [ header ] identifier = header[ 'identifier' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNACK.' ) node = self.nodes[ identifier ] or return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] ) self.log.debug "Unacking the %s node: %p" % [ identifier, body ] events = node.unacknowledge self.propagate_events( node, events ) return Arborist::TreeAPI.successful_response( nil ) end
Return a response to the `unsubscribe` action.
# File lib/arborist/manager.rb, line 685 def handle_unsubscribe_request( header, body ) self.log.info "UNSUBSCRIBE: %p" % [ header ] subscription_id = header[ 'subscription_id' ] or return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNSUBSCRIBE.' ) subscription = self.remove_subscription( subscription_id ) or return Arborist::TreeAPI.successful_response( nil ) self.log.info "Destroyed subscription: %p" % [ subscription ] return Arborist::TreeAPI.successful_response( event_type: subscription.event_type, criteria: subscription.criteria ) end
Update nodes using the data from the update request's body
.
# File lib/arborist/manager.rb, line 782 def handle_update_request( header, body ) self.log.info "UPDATE: %p" % [ header ] unless body.respond_to?( :each ) return Arborist::TreeAPI.error_response( 'client', 'Malformed update: body does not respond to #each' ) end monitor_key = header['monitor_key'] body.each do |identifier, properties| self.update_node( identifier, properties, monitor_key ) end return Arborist::TreeAPI.successful_response( nil ) end
Link the node to other nodes in the tree.
# File lib/arborist/manager.rb, line 495 def link_node( node ) raise "Tree is not built yet" unless self.tree_built? self.link_node_to_parent( node ) node.register_secondary_dependencies( self ) end
Link the specified node
to its parent. Raises an error if the specified node
's parent is not yet loaded.
# File lib/arborist/manager.rb, line 469 def link_node_to_parent( node ) self.log.debug "Linking node %p to its parent" % [ node ] parent_id = node.parent || '_' parent_node = self.nodes[ parent_id ] or raise "no parent '%s' node loaded for %p" % [ parent_id, node ] self.log.debug "adding %p as a child of %p" % [ node, parent_node ] parent_node.add_child( node ) end
Add nodes yielded from the specified enumerator
into the manager's tree.
# File lib/arborist/manager.rb, line 439 def load_tree( enumerator ) enumerator.each do |node| self.add_node( node ) end self.build_tree end
Given a request header
, return a call-able object that can handle the response.
# File lib/arborist/manager.rb, line 640 def lookup_tree_request_action( header ) raise Arborist::MessageError, "unsupported version %d" % [ header['version'] ] unless header['version'] == 1 action = header['action'] or raise Arborist::MessageError, "missing required header 'action'" raise Arborist::MessageError, "No such action '%s'" % [ action ] unless VALID_TREEAPI_ACTIONS.include?( action ) handler_name = "handle_%s_request" % [ action ] return self.method( handler_name ) end
Recurse into the children and secondary dependencies of the from
node and merge the identifiers of the traversed nodes into the deps_set
.
# File lib/arborist/manager.rb, line 744 def merge_dependencies_from( from, deps_set=Set.new ) return deps_set unless deps_set.add?( from ) start_node = self.nodes[ from ] or raise Arborist::ClientError "No such node %s." % [ from ] self.enumerator_for( start_node ).each do |subnode| deps_set.add( subnode.identifier ) subnode.node_subscribers.each do |subdep| self.merge_dependencies_from( subdep, deps_set ) end end return deps_set end
Return the number of nodes in the manager's tree.
# File lib/arborist/manager.rb, line 568 def nodecount return self.nodes.length end
Return an Array of the identifiers of all nodes in the manager's tree.
# File lib/arborist/manager.rb, line 574 def nodelist return self.nodes.keys end
IO event handler for the event socket.
# File lib/arborist/manager.rb, line 1033 def on_event_socket_event( event ) if event.writable? if (( msg = self.event_queue.shift )) # self.log.debug "Publishing event %p" % [ msg ] event.socket << msg end else raise "Unhandled event %p on the event socket" % [ event ] end self.unregister_event_socket if self.event_queue.empty? end
ZMQ::Handler API – Read and handle an incoming request.
# File lib/arborist/manager.rb, line 602 def on_tree_socket_event( event ) if event.readable? request = event.socket.receive msg = self.dispatch_request( request ) event.socket << msg else raise "Unsupported event %p on tree API socket!" % [ event ] end end
Propagate one or more events
to the specified node
and its ancestors in the tree, publishing them to matching subscriptions belonging to the nodes along the way.
# File lib/arborist/manager.rb, line 1103 def propagate_events( node, *events ) node.publish_events( *events ) if node.parent self.log.debug "Propagating %d events from %s -> %s" % [ events.length, node.identifier, node.parent ] parent = self.nodes[ node.parent ] or raise "couldn't find parent %p of node %p!" % [ node.parent, node.identifier ] self.propagate_events( parent, *events ) end end
Publish the specified event
.
# File lib/arborist/manager.rb, line 1010 def publish( identifier, event ) self.event_queue << Arborist::EventAPI.encode( identifier, event.to_h ) self.register_event_socket if self.running? end
Publish a system event that observers can watch for to detect restarts.
# File lib/arborist/manager.rb, line 1048 def publish_heartbeat_event return unless self.start_time self.publish_system_event( 'heartbeat', run_id: self.run_id, start_time: self.start_time.iso8601, version: Arborist::VERSION ) end
Publish an event with the specified eventname
and data
.
# File lib/arborist/manager.rb, line 1059 def publish_system_event( eventname, **data ) eventname = eventname.to_s eventname = 'sys.' + eventname unless eventname.start_with?( 'sys.' ) self.log.debug "Publishing %s event: %p." % [ eventname, data ] self.publish( eventname, data ) end
Yield each node that is not down to the specified block
, or return an Enumerator if no block is given.
# File lib/arborist/manager.rb, line 923 def reachable_nodes( &block ) iter = self.enumerator_for( self.root ) {|node| node.reachable? } return iter.each( &block ) if block return iter end
Register the publisher with the reactor if it's not already.
# File lib/arborist/manager.rb, line 1017 def register_event_socket self.log.debug "Registering event socket for write events." self.reactor.enable_events( self.event_socket, :write ) unless self.reactor.event_enabled?( self.event_socket, :write ) end
Remove a node
from the Manager
. The node
can either be the Arborist::Node
to remove, or the identifier of a node.
# File lib/arborist/manager.rb, line 505 def remove_node( node ) node = self.nodes[ node ] unless node.is_a?( Arborist::Node ) return unless node raise "Can't remove an operational node" if node.operational? self.log.info "Removing node %p" % [ node ] self.publish_system_event( 'node_removed', node: node.identifier ) node.children.each do |identifier, child_node| self.remove_node( child_node ) end if parent_node = self.nodes[ node.parent || '_' ] parent_node.remove_child( node ) end return self.nodes.delete( node.identifier ) end
Remove the subscription with the specified subscription_identifier
from the node it's attached to and from the manager, and return it.
# File lib/arborist/manager.rb, line 1095 def remove_subscription( subscription_identifier ) node = self.subscriptions.delete( subscription_identifier ) or return nil return node.remove_subscription( subscription_identifier ) end
Return the current root node.
# File lib/arborist/manager.rb, line 907 def root_node return self.nodes[ '_' ] end
Set up the ZMQ PUB socket for published events.
# File lib/arborist/manager.rb, line 982 def setup_event_socket @event_socket = CZTop::Socket::PUB.new self.log.info " binding the event socket (%#0x) to %p" % [ @event_socket.object_id * 2, Arborist.event_api_url ] @event_socket.options.linger = ( self.linger * 1000 ).ceil @event_socket.bind( Arborist.event_api_url ) end
Set up the ZeroMQ REP socket for the Tree API.
# File lib/arborist/manager.rb, line 585 def setup_tree_socket @tree_socket = CZTop::Socket::REP.new self.log.info " binding the tree API socket (%#0x) to %p" % [ @tree_socket.object_id * 2, Arborist.tree_api_url ] @tree_socket.options.linger = 0 @tree_socket.bind( Arborist.tree_api_url ) end
Stop accepting events to be published
# File lib/arborist/manager.rb, line 992 def shutdown_event_socket start = Time.now timeout = start + (self.linger.to_f / 2.0) self.log.warn "Waiting to empty the event queue..." until self.event_queue.empty? sleep 0.1 break if Time.now > timeout end self.log.warn " ... waited %0.1f seconds" % [ Time.now - start ] @event_socket.options.linger = 0 @event_socket.unbind( @event_socket.last_endpoint ) @event_socket = nil end
Tear down the ZeroMQ REP socket.
# File lib/arborist/manager.rb, line 595 def shutdown_tree_socket @tree_socket.unbind( @tree_socket.last_endpoint ) @tree_socket = nil end
Add the specified subscription
to the node corresponding with the given identifier
.
# File lib/arborist/manager.rb, line 1068 def subscribe( identifier, subscription ) identifier ||= '_' node = self.nodes[ identifier ] or raise ArgumentError, "no such node %p" % [ identifier ] self.log.debug "Registering subscription %p" % [ subscription ] node.add_subscription( subscription ) self.log.debug " adding '%s' to the subscriptions hash." % [ subscription.id ] self.subscriptions[ subscription.id ] = node self.log.debug " subscriptions hash: %#0x" % [ self.subscriptions.object_id ] end
Unregister the event publisher socket from the reactor if it's registered.
# File lib/arborist/manager.rb, line 1025 def unregister_event_socket self.log.debug "Unregistering event socket for write events." self.reactor.disable_events( self.event_socket, :write ) if self.reactor.event_enabled?( self.event_socket, :write ) end
Update the node with the specified identifier
with the given new_properties
and propagate any events generated by the update to the node and its ancestors.
# File lib/arborist/manager.rb, line 527 def update_node( identifier, new_properties, monitor_key='_' ) unless (( node = self.nodes[identifier] )) self.log.warn "Update for non-existent node %p ignored." % [ identifier ] return [] end events = node.update( new_properties, monitor_key ) self.propagate_events( node, events ) end
Return the duration the manager has been running in seconds.
# File lib/arborist/manager.rb, line 561 def uptime return 0 unless self.start_time return Time.now - self.start_time end