class Mamiya::Agent::TaskQueue

Constants

GRACEFUL_TIMEOUT
JOIN_TIMEOUT

Attributes

agent[R]
task_classes[R]
worker_threads[R]

Public Class Methods

new(agent, task_classes: [], logger: Mamiya::Logger.new) click to toggle source
# File lib/mamiya/agent/task_queue.rb, line 12
def initialize(agent, task_classes: [], logger: Mamiya::Logger.new)
  @agent = agent
  @task_classes = task_classes
  @external_queue = Queue.new
  @queues = {}
  @worker_threads = nil
  @statuses = nil
  @queueing_thread = nil
  @lifecycle_mutex = Mutex.new
  @terminate = false
  @logger = logger.with_clean_progname['task_queue']
end

Public Instance Methods

enqueue(task_name, task) click to toggle source
# File lib/mamiya/agent/task_queue.rb, line 85
def enqueue(task_name, task)
  raise Stopped, 'this task queue is stopped' unless running?

  if task['_labels'] && !agent.match?(task['_labels'])
    @logger.debug "skipping enqueue #{task_name.inspect}, #{task.inspect}, because agent doesn't match"
    return self
  end

  task.delete '_labels'
  task.delete 'task'
  task['task'] = task_name

  @logger.debug "enqueue #{task_name.inspect}, #{task.inspect}, #{@external_queue.inspect}"
  @external_queue << [task_name, task]
  self
end
running?() click to toggle source
# File lib/mamiya/agent/task_queue.rb, line 77
def running?
  @worker_threads && !@terminate
end
start!() click to toggle source
# File lib/mamiya/agent/task_queue.rb, line 27
def start!
  @lifecycle_mutex.synchronize do
    return if running?

    worker_threads = {}
    queues = {}
    statuses = {}

    @task_classes.each { |klass|
      name = klass.identifier.to_sym
      queue = queues[name] = Queue.new
      statuses[name] = {pending: [], lock: Mutex.new}
      th = worker_threads[name] = Thread.new(
        klass, queue,
        statuses[name],
        &method(:worker_loop)
      )
      th.abort_on_exception = true
    }

    @terminate = false
    @statuses = statuses 
    @queues = queues
    exqueue = @external_queue = Queue.new
    @queueing_thread = Thread.new(queues, exqueue, statuses, &method(:queueing_loop))
    @queueing_thread.abort_on_exception = true
    @worker_threads = worker_threads
  end
end
status() click to toggle source
# File lib/mamiya/agent/task_queue.rb, line 102
def status
  return nil unless running?
  Hash[@statuses.map do |name, st|
    [name, {
      queue: st[:pending].dup,
      working: st[:working] ? st[:working].dup : nil,
    }]
  end]
end
stop!(graceful = false) click to toggle source
# File lib/mamiya/agent/task_queue.rb, line 57
def stop!(graceful = false)
  @lifecycle_mutex.synchronize do
    return unless running?
    @terminate = true
    @queueing_thread.kill if @queueing_thread.alive?
    if graceful
      @worker_threads.each do |th|
        th.join(GRACEFUL_TIMEOUT)
      end
    end
    @worker_threads.each do |name, th|
      next unless th.alive?
      th.kill
      th.join(JOIN_TIMEOUT) 
    end
    @queues = nil
    @worker_threads = nil
  end
end
working?() click to toggle source
# File lib/mamiya/agent/task_queue.rb, line 81
def working?
  running? && status.any? { |name, stat| stat[:working] }
end

Private Instance Methods

queueing_loop(queues, external_queue, statuses) click to toggle source
# File lib/mamiya/agent/task_queue.rb, line 137
def queueing_loop(queues, external_queue, statuses)
  @logger.debug "queueing thread started #{external_queue.inspect}"
  while _ = external_queue.pop
    task_name, task = _

    break if @terminate

    queue = queues[task_name]
    unless queue
      @logger.debug "Ignoring task #{task_name} (queue not defined)"
      next
    end

    statuses[task_name][:lock].synchronize do
      statuses[task_name][:pending] << task
    end
    @logger.info "Queueing task #{task_name}: #{task.inspect}"
    queue << task
    break if @terminate
  end
  @logger.debug "queueing thread finish"
rescue Exception => e
  @logger.error "queueing thread error #{e.inspect}\n\t#{e.backtrace.join("\n\t")}"
  raise e
end
worker_loop(task_class, queue, status) click to toggle source
# File lib/mamiya/agent/task_queue.rb, line 114
def worker_loop(task_class, queue, status)
  while task = queue.pop
    break if @terminate
    begin
      status[:lock].synchronize do
        status[:pending].delete task

        task['start'] = Time.now.to_i
        status[:working] = task
      end
      task_logger = @logger.with_clean_progname
      task_class.new(self, task, agent: @agent, logger: task_logger).execute
    rescue Exception => e
      @logger.error "#{task_class} worker catched error: #{e}\n\t#{e.backtrace.join("\n\t")}"
    ensure
      status[:lock].synchronize do
        status[:working] = nil
      end
    end
    break if @terminate
  end
end