module Qs::Daemon::InstanceMethods
Attributes
daemon_data[R]
signals_redis_key[R]
Public Class Methods
new()
click to toggle source
# File lib/qs/daemon.rb, line 28 def initialize config = self.class.config begin config.validate! rescue InvalidError => exception exception.set_backtrace(caller) raise exception end Qs.init @daemon_data = DaemonData.new({ :name => config.name, :pid_file => config.pid_file, :shutdown_timeout => config.shutdown_timeout, :worker_class => config.worker_class, :worker_params => config.worker_params, :num_workers => config.num_workers, :error_procs => config.error_procs, :logger => config.logger, :queues => config.queues, :verbose_logging => config.verbose_logging, :routes => config.routes }) @signals_redis_key = "signals:#{self.daemon_data.name}-" \ "#{Socket.gethostname}-#{::Process.pid}" @thread = nil @worker_available = WorkerAvailable.new @state = State.new(:stop) # set the size of the client to the num workers + 1, this ensures we # have 1 connection for fetching work from redis and at least 1 # connection for each worker to requeue its message when hard-shutdown @client = QsClient.new(Qs.redis_connect_hash.merge({ :timeout => 1, :size => self.daemon_data.num_workers + 1 })) @worker_pool = DatWorkerPool.new(self.daemon_data.worker_class, { :num_workers => self.daemon_data.num_workers, :logger => self.daemon_data.dwp_logger, :worker_params => self.daemon_data.worker_params.merge({ :qs_daemon_data => self.daemon_data, :qs_client => @client, :qs_worker_available => @worker_available, :qs_logger => self.logger }) }) end
Public Instance Methods
halt(wait = false)
click to toggle source
# File lib/qs/daemon.rb, line 119 def halt(wait = false) return unless self.running? @state.set :halt wakeup_thread wait_for_shutdown if wait end
logger()
click to toggle source
# File lib/qs/daemon.rb, line 91 def logger @daemon_data.logger end
name()
click to toggle source
# File lib/qs/daemon.rb, line 79 def name @daemon_data.name end
pid_file()
click to toggle source
# File lib/qs/daemon.rb, line 87 def pid_file @daemon_data.pid_file end
process_label()
click to toggle source
# File lib/qs/daemon.rb, line 83 def process_label @daemon_data.process_label end
queue_redis_keys()
click to toggle source
# File lib/qs/daemon.rb, line 95 def queue_redis_keys @daemon_data.queue_redis_keys end
running?()
click to toggle source
# File lib/qs/daemon.rb, line 99 def running? !!(@thread && @thread.alive?) end
start()
click to toggle source
# File lib/qs/daemon.rb, line 103 def start # ping to check that it can communicate with redis before running, # this is friendlier than starting and continously erroring because # it can't dequeue @client.ping @state.set :run @thread ||= Thread.new{ work_loop } end
stop(wait = false)
click to toggle source
# File lib/qs/daemon.rb, line 112 def stop(wait = false) return unless self.running? @state.set :stop wakeup_thread wait_for_shutdown if wait end
Private Instance Methods
fetch_messages()
click to toggle source
# File lib/qs/daemon.rb, line 146 def fetch_messages if !@worker_pool.worker_available? && @state.run? @worker_available.wait end return unless @worker_pool.worker_available? && @state.run? # shuffle the queue redis keys to avoid queue starvation, redis will # pull messages off queues in the order they are passed to the command, # by shuffling we ensure they are randomly ordered so every queue # should get a chance; use 0 for the brpop timeout which means block # indefinitely; rescue runtime errors so the daemon thread doesn't fail # if redis is temporarily down, sleep for a second to keep the thread # from thrashing by repeatedly erroring if redis is down begin args = [self.signals_redis_key, self.queue_redis_keys.shuffle, 0].flatten redis_key, encoded_payload = @client.block_dequeue(*args) if redis_key != @signals_redis_key @worker_pool.push(QueueItem.new(redis_key, encoded_payload)) end rescue RuntimeError => exception log "Error occurred while dequeueing", :error log "#{exception.class}: #{exception.message}", :error (exception.backtrace || []).each{ |l| log(l, :error) } sleep FETCH_ERR_SLEEP_TIME end end
log(message, level = :info)
click to toggle source
# File lib/qs/daemon.rb, line 194 def log(message, level = :info) self.logger.send(level, "[Qs] #{message}") end
setup()
click to toggle source
# File lib/qs/daemon.rb, line 140 def setup # clear any signals that are already on the signals list in redis @client.clear(self.signals_redis_key) @worker_pool.start end
teardown()
click to toggle source
# File lib/qs/daemon.rb, line 173 def teardown timeout = @state.halt? ? 0 : self.daemon_data.shutdown_timeout @worker_pool.shutdown(timeout) log "Requeueing #{@worker_pool.work_items.size} message(s)" @worker_pool.work_items.each do |qi| @client.prepend(qi.queue_redis_key, qi.encoded_payload) end ensure @thread = nil end
wait_for_shutdown()
click to toggle source
# File lib/qs/daemon.rb, line 190 def wait_for_shutdown @thread.join if @thread end
wakeup_thread()
click to toggle source
# File lib/qs/daemon.rb, line 185 def wakeup_thread @client.append(self.signals_redis_key, SIGNAL) @worker_available.signal end
work_loop()
click to toggle source
# File lib/qs/daemon.rb, line 128 def work_loop setup fetch_messages while @state.run? rescue StandardError => exception @state.set :stop log "Error occurred while running the daemon, exiting", :error log "#{exception.class}: #{exception.message}", :error (exception.backtrace || []).each{ |l| log(l, :error) } ensure teardown end