class Mamiya::Master::AgentMonitor

Class to monitor agent's status. This collects all agents' status. Statuses are updated by event from agent, and running serf query `mamiya:status` periodically.

Constants

DEFAULT_INTERVAL
PACKAGES_QUERY
PACKAGE_STATUS_KEYS
STATUS_QUERY

Attributes

agents[R]
failed_agents[R]
last_refresh_at[R]

Public Class Methods

new(master, raise_exception: false) click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 24
def initialize(master, raise_exception: false)
  @master = master
  @interval = (master.config[:master] && 
              master.config[:master][:monitor] &&
              master.config[:master][:monitor][:refresh_interval]) ||
              DEFAULT_INTERVAL
 
  @raise_exception = raise_exception

  @agents = {}.freeze
  @failed_agents = [].freeze
  @statuses = {}
  @commit_lock = Mutex.new
  @last_refresh_at = nil
end

Public Instance Methods

application_status(app, labels: nil) click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 58
def application_status(app, labels: nil)
  ApplicationStatus.new(self, app, labels: labels)
end
commit_event(event) click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 91
def commit_event(event)
  @commit_lock.synchronize { commit_event_without_lock(event) }
end
commit_event_without_lock(event) click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 95
def commit_event_without_lock(event)
  return unless /\Amamiya:/ === event.user_event

  method_name = event.user_event[7..-1].gsub(/:/, '__').gsub(/-/,'_')
  return unless self.respond_to?(method_name, true)

  payload = JSON.parse(event.payload)
  agent = @statuses[payload["name"]]
  return unless agent

  logger.debug "Commiting #{event.user_event}"
  logger.debug "- #{agent.inspect}"
  __send__ method_name, agent, payload, event
  logger.debug "+ #{agent.inspect}"

rescue JSON::ParserError => e
  logger.warn "Failed to parse payload in event #{event.user_event}: #{e.message}"
end
package_status(app, pkg, labels: nil) click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 54
def package_status(app, pkg, labels: nil)
  PackageStatus.new(self, app, pkg, labels: labels)
end
refresh(**kwargs) click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 114
def refresh(**kwargs)
  logger.debug "Refreshing..."

  new_agents = {}
  new_failed_agents = Set.new
  new_statuses = {}

  @master.serf.members.each do |member|
    new_agents[member["name"]] = member
    new_failed_agents.add(member["name"]) unless member["status"] == 'alive'
  end

  @commit_lock.synchronize { 
    if kwargs[:node]
      new_statuses = statuses.reject do |name, status|
        kwargs[:node].include?(name)
      end
    end

    status_query_th = Thread.new { @master.serf.query(STATUS_QUERY, '', **kwargs) }
    packages_query_th = Thread.new { @master.serf.query(PACKAGES_QUERY, '', **kwargs) }
    status_response = status_query_th.value
    packages_response = packages_query_th.value

    status_response["Responses"].each do |name, json|
      begin
        new_statuses[name] = JSON.parse(json)
      rescue JSON::ParserError => e
        logger.warn "Failed to parse status from #{name}: #{e.message}"
        new_failed_agents << name
        next
      end
    end

    packages_response["Responses"].each do |name, json|
      next unless new_statuses[name]

      begin
        resp = JSON.parse(json)

        PACKAGE_STATUS_KEYS.each do |k|
          new_statuses[name][k] = resp[k]
        end
      rescue JSON::ParserError => e
        logger.warn "Failed to parse packages from #{name}: #{e.message}"
        next
      end
    end

    (new_statuses.keys - packages_response["Responses"].keys).each do |name|
      PACKAGE_STATUS_KEYS.each do |k|
        if @statuses[name] && @statuses[name][k]
          new_statuses[name][k] = @statuses[name][k]
        end
      end
    end

    new_failed_agents = new_failed_agents.to_a

    (new_agents.keys - @agents.keys).join(", ").tap do |agents|
      logger.info "Added agents: #{agents}" unless agents.empty?
    end

    (@agents.keys - new_agents.keys).join(", ").tap do |agents|
      logger.info "Removed agents: #{agents}" unless agents.empty?
    end

    (failed_agents - new_failed_agents).join(", ").tap do |agents|
      logger.info "Recovered agents: #{agents}" unless agents.empty?
    end

    (new_failed_agents - failed_agents).join(", ").tap do |agents|
      logger.info "Newly failed agents: #{agents}" unless agents.empty?
    end

    @agents = new_agents.freeze
    @failed_agents = new_failed_agents.freeze
    @statuses = new_statuses
    @last_refresh_at = Time.now
  }

  self
end
running?() click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 76
def running?
  @thread && @thread.alive?
end
start!() click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 62
def start!
  @thread ||= Thread.new do
    loop do
      self.work_loop
      sleep @interval
    end
  end
end
statuses(labels: nil) click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 42
def statuses(labels: nil)
  if labels
    @statuses.select { |name, status|
      status['labels'] &&
        Mamiya::Util::LabelMatcher::Simple.new(status['labels']).
        match?(labels)
    }
  else
    @statuses
  end
end
stop!() click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 71
def stop!
  @thread.kill if running?
  @thread = nil
end
work_loop() click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 80
def work_loop
  self.refresh
rescue Exception => e
  raise e if @raise_exception

  logger.fatal "Periodical refreshing failed: #{e.class}: #{e.message}"
  e.backtrace.each do |line|
    logger.fatal "\t#{line}"
  end
end

Private Instance Methods

logger() click to toggle source
# File lib/mamiya/master/agent_monitor.rb, line 200
def logger
  @logger ||= @master.logger.with_clean_progname['agent-monitor']
end