class Mamiya::Master::AgentMonitor
Class to monitor agent's status. This collects all agents' status. Statuses are updated by event from agent, and running serf query `mamiya:status` periodically.
Constants
- DEFAULT_INTERVAL
- PACKAGES_QUERY
- PACKAGE_STATUS_KEYS
- STATUS_QUERY
Attributes
agents[R]
failed_agents[R]
last_refresh_at[R]
Public Class Methods
new(master, raise_exception: false)
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 24 def initialize(master, raise_exception: false) @master = master @interval = (master.config[:master] && master.config[:master][:monitor] && master.config[:master][:monitor][:refresh_interval]) || DEFAULT_INTERVAL @raise_exception = raise_exception @agents = {}.freeze @failed_agents = [].freeze @statuses = {} @commit_lock = Mutex.new @last_refresh_at = nil end
Public Instance Methods
application_status(app, labels: nil)
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 58 def application_status(app, labels: nil) ApplicationStatus.new(self, app, labels: labels) end
commit_event(event)
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 91 def commit_event(event) @commit_lock.synchronize { commit_event_without_lock(event) } end
commit_event_without_lock(event)
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 95 def commit_event_without_lock(event) return unless /\Amamiya:/ === event.user_event method_name = event.user_event[7..-1].gsub(/:/, '__').gsub(/-/,'_') return unless self.respond_to?(method_name, true) payload = JSON.parse(event.payload) agent = @statuses[payload["name"]] return unless agent logger.debug "Commiting #{event.user_event}" logger.debug "- #{agent.inspect}" __send__ method_name, agent, payload, event logger.debug "+ #{agent.inspect}" rescue JSON::ParserError => e logger.warn "Failed to parse payload in event #{event.user_event}: #{e.message}" end
package_status(app, pkg, labels: nil)
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 54 def package_status(app, pkg, labels: nil) PackageStatus.new(self, app, pkg, labels: labels) end
refresh(**kwargs)
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 114 def refresh(**kwargs) logger.debug "Refreshing..." new_agents = {} new_failed_agents = Set.new new_statuses = {} @master.serf.members.each do |member| new_agents[member["name"]] = member new_failed_agents.add(member["name"]) unless member["status"] == 'alive' end @commit_lock.synchronize { if kwargs[:node] new_statuses = statuses.reject do |name, status| kwargs[:node].include?(name) end end status_query_th = Thread.new { @master.serf.query(STATUS_QUERY, '', **kwargs) } packages_query_th = Thread.new { @master.serf.query(PACKAGES_QUERY, '', **kwargs) } status_response = status_query_th.value packages_response = packages_query_th.value status_response["Responses"].each do |name, json| begin new_statuses[name] = JSON.parse(json) rescue JSON::ParserError => e logger.warn "Failed to parse status from #{name}: #{e.message}" new_failed_agents << name next end end packages_response["Responses"].each do |name, json| next unless new_statuses[name] begin resp = JSON.parse(json) PACKAGE_STATUS_KEYS.each do |k| new_statuses[name][k] = resp[k] end rescue JSON::ParserError => e logger.warn "Failed to parse packages from #{name}: #{e.message}" next end end (new_statuses.keys - packages_response["Responses"].keys).each do |name| PACKAGE_STATUS_KEYS.each do |k| if @statuses[name] && @statuses[name][k] new_statuses[name][k] = @statuses[name][k] end end end new_failed_agents = new_failed_agents.to_a (new_agents.keys - @agents.keys).join(", ").tap do |agents| logger.info "Added agents: #{agents}" unless agents.empty? end (@agents.keys - new_agents.keys).join(", ").tap do |agents| logger.info "Removed agents: #{agents}" unless agents.empty? end (failed_agents - new_failed_agents).join(", ").tap do |agents| logger.info "Recovered agents: #{agents}" unless agents.empty? end (new_failed_agents - failed_agents).join(", ").tap do |agents| logger.info "Newly failed agents: #{agents}" unless agents.empty? end @agents = new_agents.freeze @failed_agents = new_failed_agents.freeze @statuses = new_statuses @last_refresh_at = Time.now } self end
running?()
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 76 def running? @thread && @thread.alive? end
start!()
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 62 def start! @thread ||= Thread.new do loop do self.work_loop sleep @interval end end end
statuses(labels: nil)
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 42 def statuses(labels: nil) if labels @statuses.select { |name, status| status['labels'] && Mamiya::Util::LabelMatcher::Simple.new(status['labels']). match?(labels) } else @statuses end end
stop!()
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 71 def stop! @thread.kill if running? @thread = nil end
work_loop()
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 80 def work_loop self.refresh rescue Exception => e raise e if @raise_exception logger.fatal "Periodical refreshing failed: #{e.class}: #{e.message}" e.backtrace.each do |line| logger.fatal "\t#{line}" end end
Private Instance Methods
logger()
click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 200 def logger @logger ||= @master.logger.with_clean_progname['agent-monitor'] end