module Qs::Daemon::InstanceMethods

Attributes

daemon_data[R]
signals_redis_key[R]

Public Class Methods

new() click to toggle source
# File lib/qs/daemon.rb, line 28
def initialize
  config = self.class.config
  begin
    config.validate!
  rescue InvalidError => exception
    exception.set_backtrace(caller)
    raise exception
  end
  Qs.init

  @daemon_data = DaemonData.new({
    :name             => config.name,
    :pid_file         => config.pid_file,
    :shutdown_timeout => config.shutdown_timeout,
    :worker_class     => config.worker_class,
    :worker_params    => config.worker_params,
    :num_workers      => config.num_workers,
    :error_procs      => config.error_procs,
    :logger           => config.logger,
    :queues           => config.queues,
    :verbose_logging  => config.verbose_logging,
    :routes           => config.routes
  })

  @signals_redis_key = "signals:#{self.daemon_data.name}-" \
                       "#{Socket.gethostname}-#{::Process.pid}"

  @thread           = nil
  @worker_available = WorkerAvailable.new
  @state            = State.new(:stop)

  # set the size of the client to the num workers + 1, this ensures we
  # have 1 connection for fetching work from redis and at least 1
  # connection for each worker to requeue its message when hard-shutdown
  @client = QsClient.new(Qs.redis_connect_hash.merge({
    :timeout => 1,
    :size    => self.daemon_data.num_workers + 1
  }))

  @worker_pool = DatWorkerPool.new(self.daemon_data.worker_class, {
    :num_workers   => self.daemon_data.num_workers,
    :logger        => self.daemon_data.dwp_logger,
    :worker_params => self.daemon_data.worker_params.merge({
      :qs_daemon_data      => self.daemon_data,
      :qs_client           => @client,
      :qs_worker_available => @worker_available,
      :qs_logger           => self.logger
    })
  })
end

Public Instance Methods

halt(wait = false) click to toggle source
# File lib/qs/daemon.rb, line 119
def halt(wait = false)
  return unless self.running?
  @state.set :halt
  wakeup_thread
  wait_for_shutdown if wait
end
logger() click to toggle source
# File lib/qs/daemon.rb, line 91
def logger
  @daemon_data.logger
end
name() click to toggle source
# File lib/qs/daemon.rb, line 79
def name
  @daemon_data.name
end
pid_file() click to toggle source
# File lib/qs/daemon.rb, line 87
def pid_file
  @daemon_data.pid_file
end
process_label() click to toggle source
# File lib/qs/daemon.rb, line 83
def process_label
  @daemon_data.process_label
end
queue_redis_keys() click to toggle source
# File lib/qs/daemon.rb, line 95
def queue_redis_keys
  @daemon_data.queue_redis_keys
end
running?() click to toggle source
# File lib/qs/daemon.rb, line 99
def running?
  !!(@thread && @thread.alive?)
end
start() click to toggle source
# File lib/qs/daemon.rb, line 103
def start
  # ping to check that it can communicate with redis before running,
  # this is friendlier than starting and continously erroring because
  # it can't dequeue
  @client.ping
  @state.set :run
  @thread ||= Thread.new{ work_loop }
end
stop(wait = false) click to toggle source
# File lib/qs/daemon.rb, line 112
def stop(wait = false)
  return unless self.running?
  @state.set :stop
  wakeup_thread
  wait_for_shutdown if wait
end

Private Instance Methods

fetch_messages() click to toggle source
# File lib/qs/daemon.rb, line 146
def fetch_messages
  if !@worker_pool.worker_available? && @state.run?
    @worker_available.wait
  end
  return unless @worker_pool.worker_available? && @state.run?

  # shuffle the queue redis keys to avoid queue starvation, redis will
  # pull messages off queues in the order they are passed to the command,
  # by shuffling we ensure they are randomly ordered so every queue
  # should  get a chance; use 0 for the brpop timeout which means block
  # indefinitely; rescue runtime errors so the daemon thread doesn't fail
  # if redis is temporarily down, sleep for a second to keep the thread
  # from thrashing by repeatedly erroring if redis is down
  begin
    args = [self.signals_redis_key, self.queue_redis_keys.shuffle, 0].flatten
    redis_key, encoded_payload = @client.block_dequeue(*args)
    if redis_key != @signals_redis_key
      @worker_pool.push(QueueItem.new(redis_key, encoded_payload))
    end
  rescue RuntimeError => exception
    log "Error occurred while dequeueing", :error
    log "#{exception.class}: #{exception.message}", :error
    (exception.backtrace || []).each{ |l| log(l, :error) }
    sleep FETCH_ERR_SLEEP_TIME
  end
end
log(message, level = :info) click to toggle source
# File lib/qs/daemon.rb, line 194
def log(message, level = :info)
  self.logger.send(level, "[Qs] #{message}")
end
setup() click to toggle source
# File lib/qs/daemon.rb, line 140
def setup
 # clear any signals that are already on the signals list in redis
 @client.clear(self.signals_redis_key)
  @worker_pool.start
end
teardown() click to toggle source
# File lib/qs/daemon.rb, line 173
def teardown
  timeout = @state.halt? ? 0 : self.daemon_data.shutdown_timeout
  @worker_pool.shutdown(timeout)

  log "Requeueing #{@worker_pool.work_items.size} message(s)"
  @worker_pool.work_items.each do |qi|
    @client.prepend(qi.queue_redis_key, qi.encoded_payload)
  end
ensure
  @thread = nil
end
wait_for_shutdown() click to toggle source
# File lib/qs/daemon.rb, line 190
def wait_for_shutdown
  @thread.join if @thread
end
wakeup_thread() click to toggle source
# File lib/qs/daemon.rb, line 185
def wakeup_thread
  @client.append(self.signals_redis_key, SIGNAL)
  @worker_available.signal
end
work_loop() click to toggle source
# File lib/qs/daemon.rb, line 128
def work_loop
  setup
  fetch_messages while @state.run?
rescue StandardError => exception
  @state.set :stop
  log "Error occurred while running the daemon, exiting", :error
  log "#{exception.class}: #{exception.message}", :error
  (exception.backtrace || []).each{ |l| log(l, :error) }
ensure
  teardown
end