class Qs::PayloadHandler
Attributes
daemon_data[R]
logger[R]
queue_item[R]
Public Class Methods
new(daemon_data, queue_item)
click to toggle source
# File lib/qs/payload_handler.rb, line 15 def initialize(daemon_data, queue_item) @daemon_data = daemon_data @queue_item = queue_item @logger = Qs::Logger.new( @daemon_data.logger, @daemon_data.verbose_logging ) end
Public Instance Methods
run()
click to toggle source
# File lib/qs/payload_handler.rb, line 24 def run log_received benchmark = Benchmark.measure{ run!(@daemon_data, @queue_item) } @queue_item.time_taken = RoundedTime.new(benchmark.real) log_complete(@queue_item) end
Private Instance Methods
build_summary_line(queue_item)
click to toggle source
# File lib/qs/payload_handler.rb, line 109 def build_summary_line(queue_item) summary_line_args = { 'time' => queue_item.time_taken, 'handler' => queue_item.handler_class } if (exception = queue_item.exception) summary_line_args['error'] = "#{exception.class}: #{exception.message}" end if (message = queue_item.message) summary_line_args['params'] = message.params self.send( "#{Qs::Payload.type_method_name(message.payload_type)}_summary_line", message, summary_line_args ) else UnknownSummaryLine.new(summary_line_args) end end
event_summary_line(event, summary_line_args)
click to toggle source
# File lib/qs/payload_handler.rb, line 133 def event_summary_line(event, summary_line_args) EventSummaryLine.new(event, summary_line_args) end
handle_exception(exception, daemon_data, queue_item)
click to toggle source
# File lib/qs/payload_handler.rb, line 57 def handle_exception(exception, daemon_data, queue_item) error_handler = Qs::ErrorHandler.new(exception, { :daemon_data => daemon_data, :queue_redis_key => queue_item.queue_redis_key, :encoded_payload => queue_item.encoded_payload, :message => queue_item.message, :handler_class => queue_item.handler_class }).tap(&:run) queue_item.exception = error_handler.exception log_exception(queue_item.exception) end
job_summary_line(job, summary_line_args)
click to toggle source
# File lib/qs/payload_handler.rb, line 129 def job_summary_line(job, summary_line_args) JobSummaryLine.new(job, summary_line_args) end
log_complete(queue_item)
click to toggle source
# File lib/qs/payload_handler.rb, line 91 def log_complete(queue_item) log_verbose "===== Completed in #{queue_item.time_taken}ms =====" log_summary build_summary_line(queue_item) end
log_event(event)
click to toggle source
# File lib/qs/payload_handler.rb, line 82 def log_event(event) log_verbose " Event: #{event.route_name.inspect}" log_verbose " Publisher: #{event.publisher.inspect}" end
log_exception(exception)
click to toggle source
# File lib/qs/payload_handler.rb, line 96 def log_exception(exception) log_verbose("#{exception.class}: #{exception.message}", :error) (exception.backtrace || []).each{ |l| log_verbose(l, :error) } end
log_handler_class(handler_class)
click to toggle source
# File lib/qs/payload_handler.rb, line 87 def log_handler_class(handler_class) log_verbose " Handler: #{handler_class}" end
log_job(job)
click to toggle source
# File lib/qs/payload_handler.rb, line 78 def log_job(job) log_verbose " Job: #{job.route_name.inspect}" end
log_message(message)
click to toggle source
# File lib/qs/payload_handler.rb, line 73 def log_message(message) self.send("log_#{Qs::Payload.type_method_name(message.payload_type)}", message) log_verbose " Params: #{message.params.inspect}" end
log_received()
click to toggle source
# File lib/qs/payload_handler.rb, line 69 def log_received log_verbose "===== Received message =====" end
log_summary(message, level = :info)
click to toggle source
# File lib/qs/payload_handler.rb, line 105 def log_summary(message, level = :info) self.logger.summary.send(level, "[Qs] #{message}") end
log_verbose(message, level = :info)
click to toggle source
# File lib/qs/payload_handler.rb, line 101 def log_verbose(message, level = :info) self.logger.verbose.send(level, "[Qs] #{message}") end
run!(daemon_data, queue_item)
click to toggle source
# File lib/qs/payload_handler.rb, line 33 def run!(daemon_data, queue_item) queue_item.started = true message = Qs::Payload.deserialize(queue_item.encoded_payload) log_message(message) queue_item.message = message route = daemon_data.route_for(message.route_id) log_handler_class(route.handler_class) queue_item.handler_class = route.handler_class route.run(message, daemon_data) queue_item.finished = true rescue DatWorkerPool::ShutdownError => exception if queue_item.started error = ShutdownError.new(exception.message) error.set_backtrace(exception.backtrace) handle_exception(error, daemon_data, queue_item) end raise exception rescue *Qs::ErrorHandler::STANDARD_ERROR_CLASSES => exception handle_exception(exception, daemon_data, queue_item) end