class Mamiya::Agent

Attributes

config[R]
logger[R]
serf[R]

Public Class Methods

new(config, logger: Mamiya::Logger.new, events_only: nil) click to toggle source
# File lib/mamiya/agent.rb, line 29
def initialize(config, logger: Mamiya::Logger.new, events_only: nil)
  @config = config
  @serf = init_serf
  @trigger_lock = Mutex.new
  @events_only = events_only

  @terminate = false

  @logger = logger['agent']
end

Public Instance Methods

currents() click to toggle source
# File lib/mamiya/agent.rb, line 171
def currents
  # TODO: when the target is in outside?
  Hash[config.applications.map do |name, app|
    deploy_to = Pathname.new(app[:deploy_to])
    current = deploy_to.join('current')
    next unless current.exist?

    [
      name,
      current.realpath.basename.to_s
    ]
  end.compact]
end
existing_packages() click to toggle source

Returns hash with existing packages (where valid) by app name. Packages which has json and tarball is considered as valid.

# File lib/mamiya/agent.rb, line 111
def existing_packages
  paths_by_app = Dir[File.join(config[:packages_dir], '*', '*.{tar.gz,json}')].group_by { |path|
    path.split(File::SEPARATOR)[-2]
  }

  Hash[
    paths_by_app.map { |app, paths|
      names_by_base = paths.group_by do |path|
        File.basename(path).sub(/\.(?:tar\.gz|json)\z/, '')
      end

      packages = names_by_base.flat_map { |base, names|
        names.map do |name|
          (
            name.end_with?(".tar.gz") &&
            names.find { |_| _.end_with?(".json") } &&
            base
          ) || nil
        end
      }.compact

      [app, packages.sort]
    }
  ]
end
existing_prereleases() click to toggle source
# File lib/mamiya/agent.rb, line 137
def existing_prereleases
  paths_by_app = Dir[File.join(config[:prereleases_dir], '*', '*')].group_by { |path|
    path.split(File::SEPARATOR)[-2]
  }

  Hash[
    paths_by_app.map { |app, paths|
      [
        app,
        paths.select { |path|
          File.exist? File.join(path, '.mamiya.prepared')
        }.map { |path|
          File.basename(path)
        }.sort
      ]
    }
  ]
end
labels() click to toggle source
# File lib/mamiya/agent.rb, line 102
def labels
  config.labels[[]]
end
releases() click to toggle source
# File lib/mamiya/agent.rb, line 156
def releases
  Hash[config.applications.map do |name, app|
    deploy_to = Pathname.new(app[:deploy_to])
    releases = deploy_to.join('releases')
    next [name, []] unless releases.exist?

    [
      name,
      releases.children.map do |release|
        release.basename.to_s
      end.sort
    ]
  end.compact]
end
run!() click to toggle source
# File lib/mamiya/agent.rb, line 52
def run!
  logger.info "Starting..."
  start()
  logger.info "Started."

  loop do
    if @terminate
      terminate
      return
    end
    sleep 1
  end
end
start() click to toggle source
# File lib/mamiya/agent.rb, line 70
def start
  serf_start
  task_queue_start
end
status(packages: true) click to toggle source

Returns agent status. Used for HTTP API and `serf query` inspection.

# File lib/mamiya/agent.rb, line 84
def status(packages: true)
  # When changing signature, don't forget to change samely of Master#status too
  {}.tap do |s|
    s[:name] = serf.name
    s[:version] = Mamiya::VERSION
    s[:labels] = labels

    s[:queues] = task_queue.status

    if packages
      s[:packages] = self.existing_packages
      s[:prereleases] = self.existing_prereleases
      s[:releases] = self.releases
      s[:currents] = self.currents
    end
  end
end
stop!() click to toggle source
# File lib/mamiya/agent.rb, line 66
def stop!
  @terminate = true
end
task_queue() click to toggle source
# File lib/mamiya/agent.rb, line 42
def task_queue
  @task_queue ||= Mamiya::Agent::TaskQueue.new(self, logger: logger, task_classes: [
    Mamiya::Agent::Tasks::Fetch,
    Mamiya::Agent::Tasks::Prepare,
    Mamiya::Agent::Tasks::Clean,
    Mamiya::Agent::Tasks::Switch,
    Mamiya::Agent::Tasks::Ping,
  ])
end
terminate() click to toggle source
# File lib/mamiya/agent.rb, line 75
def terminate
  serf.stop!
  task_queue.stop!
ensure
  @terminate = false
end
trigger(type, action: nil, coalesce: true, **payload) click to toggle source
# File lib/mamiya/agent.rb, line 185
def trigger(type, action: nil, coalesce: true, **payload)
  name = "mamiya:#{type}"
  name << ":#{action}" if action

  payload_str = payload.merge(name: self.serf.name).to_json

  @trigger_lock.synchronize do
    logger.debug "Send serf event #{name}(coalesce=#{coalesce}): #{payload_str}"
    serf.event(name, payload_str, coalesce: coalesce)
  end
end

Private Instance Methods

init_serf() click to toggle source
# File lib/mamiya/agent.rb, line 199
def init_serf
  agent_config = (config[:serf] && config[:serf][:agent]) || {}
  # agent_config.merge!(log: $stderr)
  Villein::Agent.new(**agent_config).tap do |serf|
    serf.on_user_event do |event|
      user_event_handler(event)
    end

    serf.respond('mamiya:status') do |event|
      self.status(packages: false).to_json
    end

    serf.respond('mamiya:packages') do |event|
      {
        'packages' => self.existing_packages,
        'prereleases' => self.existing_prereleases,
        'releases' => self.releases,
        'currents' => self.currents,
      }.to_json
    end
  end
end
serf_start() click to toggle source
# File lib/mamiya/agent.rb, line 222
def serf_start
  logger.debug "Starting serf"

  @serf.start!
  @serf.auto_stop
  @serf.wait_for_ready

  logger.debug "Serf became ready"
end
task_queue_start() click to toggle source
# File lib/mamiya/agent.rb, line 232
def task_queue_start
  logger.debug "Starting task_queue"
  task_queue.start!
end
user_event_handler(event) click to toggle source
# File lib/mamiya/agent.rb, line 237
def user_event_handler(event)
  user_event, payload = event.user_event, JSON.parse(event.payload)

  return unless user_event.start_with?('mamiya:')
  user_event = user_event.sub(/^mamiya:/, '')

  type, action = user_event.split(/:/, 2)

  return if @events_only && !@events_only.any?{ |_| _ === type }

  class_name = type.capitalize.gsub(/-./) { |_| _[1].upcase }

  if config.debug_all_events
    logger.debug "Received user event #{type}"
    logger.debug payload.inspect
  end

  if Handlers.const_defined?(class_name)
    handler = Handlers.const_get(class_name).new(self, event)
    meth = action || :run!
    if handler.respond_to?(meth)
      handler.send meth
    else
      if config.debug_all_events
        logger.debug "Handler #{class_name} doesn't respond to #{meth}, skipping"
      end
    end
  else
    #logger.warn("Discarded event[#{event.user_event}] because we don't handle it")
  end
rescue Exception => e
  logger.fatal("Error during handling event: #{e.inspect}")
  e.backtrace.each do |line|
    logger.fatal line.prepend("\t")
  end

  raise e if $0.end_with?('rspec')
rescue JSON::ParserError
  logger.warn("Discarded event[#{event.user_event}] with invalid payload (unable to parse as json)")
end