class Qs::PayloadHandler

Attributes

daemon_data[R]
logger[R]
queue_item[R]

Public Class Methods

new(daemon_data, queue_item) click to toggle source
# File lib/qs/payload_handler.rb, line 15
def initialize(daemon_data, queue_item)
  @daemon_data = daemon_data
  @queue_item  = queue_item
  @logger = Qs::Logger.new(
    @daemon_data.logger,
    @daemon_data.verbose_logging
  )
end

Public Instance Methods

run() click to toggle source
# File lib/qs/payload_handler.rb, line 24
def run
  log_received
  benchmark = Benchmark.measure{ run!(@daemon_data, @queue_item) }
  @queue_item.time_taken = RoundedTime.new(benchmark.real)
  log_complete(@queue_item)
end

Private Instance Methods

build_summary_line(queue_item) click to toggle source
# File lib/qs/payload_handler.rb, line 109
def build_summary_line(queue_item)
  summary_line_args = {
    'time'    => queue_item.time_taken,
    'handler' => queue_item.handler_class
  }
  if (exception = queue_item.exception)
    summary_line_args['error'] = "#{exception.class}: #{exception.message}"
  end
  if (message = queue_item.message)
    summary_line_args['params'] = message.params
    self.send(
      "#{Qs::Payload.type_method_name(message.payload_type)}_summary_line",
      message,
      summary_line_args
    )
  else
    UnknownSummaryLine.new(summary_line_args)
  end
end
event_summary_line(event, summary_line_args) click to toggle source
# File lib/qs/payload_handler.rb, line 133
def event_summary_line(event, summary_line_args)
  EventSummaryLine.new(event, summary_line_args)
end
handle_exception(exception, daemon_data, queue_item) click to toggle source
# File lib/qs/payload_handler.rb, line 57
def handle_exception(exception, daemon_data, queue_item)
  error_handler = Qs::ErrorHandler.new(exception, {
    :daemon_data     => daemon_data,
    :queue_redis_key => queue_item.queue_redis_key,
    :encoded_payload => queue_item.encoded_payload,
    :message         => queue_item.message,
    :handler_class   => queue_item.handler_class
  }).tap(&:run)
  queue_item.exception = error_handler.exception
  log_exception(queue_item.exception)
end
job_summary_line(job, summary_line_args) click to toggle source
# File lib/qs/payload_handler.rb, line 129
def job_summary_line(job, summary_line_args)
  JobSummaryLine.new(job, summary_line_args)
end
log_complete(queue_item) click to toggle source
# File lib/qs/payload_handler.rb, line 91
def log_complete(queue_item)
  log_verbose "===== Completed in #{queue_item.time_taken}ms ====="
  log_summary build_summary_line(queue_item)
end
log_event(event) click to toggle source
# File lib/qs/payload_handler.rb, line 82
def log_event(event)
  log_verbose "  Event:     #{event.route_name.inspect}"
  log_verbose "  Publisher: #{event.publisher.inspect}"
end
log_exception(exception) click to toggle source
# File lib/qs/payload_handler.rb, line 96
def log_exception(exception)
  log_verbose("#{exception.class}: #{exception.message}", :error)
  (exception.backtrace || []).each{ |l| log_verbose(l, :error) }
end
log_handler_class(handler_class) click to toggle source
# File lib/qs/payload_handler.rb, line 87
def log_handler_class(handler_class)
  log_verbose "  Handler:   #{handler_class}"
end
log_job(job) click to toggle source
# File lib/qs/payload_handler.rb, line 78
def log_job(job)
  log_verbose "  Job:       #{job.route_name.inspect}"
end
log_message(message) click to toggle source
# File lib/qs/payload_handler.rb, line 73
def log_message(message)
  self.send("log_#{Qs::Payload.type_method_name(message.payload_type)}", message)
  log_verbose "  Params:    #{message.params.inspect}"
end
log_received() click to toggle source
# File lib/qs/payload_handler.rb, line 69
def log_received
  log_verbose "===== Received message ====="
end
log_summary(message, level = :info) click to toggle source
# File lib/qs/payload_handler.rb, line 105
def log_summary(message, level = :info)
  self.logger.summary.send(level, "[Qs] #{message}")
end
log_verbose(message, level = :info) click to toggle source
# File lib/qs/payload_handler.rb, line 101
def log_verbose(message, level = :info)
  self.logger.verbose.send(level, "[Qs] #{message}")
end
run!(daemon_data, queue_item) click to toggle source
# File lib/qs/payload_handler.rb, line 33
def run!(daemon_data, queue_item)
  queue_item.started = true

  message = Qs::Payload.deserialize(queue_item.encoded_payload)
  log_message(message)
  queue_item.message = message

  route = daemon_data.route_for(message.route_id)
  log_handler_class(route.handler_class)
  queue_item.handler_class = route.handler_class

  route.run(message, daemon_data)
  queue_item.finished = true
rescue DatWorkerPool::ShutdownError => exception
  if queue_item.started
    error = ShutdownError.new(exception.message)
    error.set_backtrace(exception.backtrace)
    handle_exception(error, daemon_data, queue_item)
  end
  raise exception
rescue *Qs::ErrorHandler::STANDARD_ERROR_CLASSES => exception
  handle_exception(exception, daemon_data, queue_item)
end