class Toiler::Actor::Processor
Responsible for processing sqs messages and notifying Fetcher
when done
Attributes
body_parser[RW]
executing[RW]
extend_callback[RW]
fetcher[RW]
queue[RW]
thread[RW]
worker_class[RW]
Public Class Methods
new(queue)
click to toggle source
# File lib/toiler/actor/processor.rb, line 13 def initialize(queue) @queue = queue @worker_class = Toiler.worker_class_registry[queue] @executing = false @thread = nil init_options end
Public Instance Methods
default_executor()
click to toggle source
# File lib/toiler/actor/processor.rb, line 21 def default_executor Concurrent.global_io_executor end
executing?()
click to toggle source
# File lib/toiler/actor/processor.rb, line 37 def executing? @executing end
on_message(msg)
click to toggle source
# File lib/toiler/actor/processor.rb, line 29 def on_message(msg) method, *args = msg send(method, *args) rescue StandardError, SystemStackError => e # rescue SystemStackError, if clients misbehave and cause a stack level too deep exception, we should be able to recover error "Processor #{queue} failed processing, reason: #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" end
Private Instance Methods
auto_delete?()
click to toggle source
# File lib/toiler/actor/processor.rb, line 55 def auto_delete? @auto_delete end
auto_visibility_timeout?()
click to toggle source
# File lib/toiler/actor/processor.rb, line 51 def auto_visibility_timeout? @auto_visibility_timeout end
get_body(sqs_msg)
click to toggle source
# File lib/toiler/actor/processor.rb, line 109 def get_body(sqs_msg) if sqs_msg.is_a? Array sqs_msg.map { |m| parse_body m } else parse_body sqs_msg end end
init_options()
click to toggle source
# File lib/toiler/actor/processor.rb, line 43 def init_options @auto_visibility_timeout = @worker_class.auto_visibility_timeout? @auto_delete = @worker_class.auto_delete? toiler_options = @worker_class.toiler_options @body_parser = toiler_options[:parser] @extend_callback = toiler_options[:on_visibility_extend] end
parse_body(sqs_msg)
click to toggle source
# File lib/toiler/actor/processor.rb, line 117 def parse_body(sqs_msg) case body_parser when :json then JSON.parse sqs_msg.body when Proc then body_parser.call sqs_msg when :text, nil then sqs_msg.body else body_parser.load sqs_msg.body end rescue StandardError => e raise "Error parsing the message body: #{e.message}" end
process(visibility, sqs_msg)
click to toggle source
# File lib/toiler/actor/processor.rb, line 59 def process(visibility, sqs_msg) process_init worker = @worker_class.new body = get_body(sqs_msg) timer = visibility_extender visibility, sqs_msg, body, &extend_callback debug "Worker #{queue} starts performing..." worker.perform sqs_msg, body debug "Worker #{queue} finishes performing..." sqs_msg.delete if auto_delete? ensure process_cleanup timer end
process_cleanup(timer)
click to toggle source
# File lib/toiler/actor/processor.rb, line 79 def process_cleanup(timer) debug "Processor #{queue} starts cleanup after perform..." timer.shutdown if timer ::ActiveRecord::Base.clear_active_connections! if defined? ActiveRecord processor_finished @executing = false @thread = nil debug "Processor #{queue} finished cleanup after perform..." end
process_init()
click to toggle source
# File lib/toiler/actor/processor.rb, line 73 def process_init @executing = true @thread = Thread.current debug "Processor #{queue} begins processing..." end
processor_finished()
click to toggle source
# File lib/toiler/actor/processor.rb, line 89 def processor_finished fetcher.tell :processor_finished end
visibility_extender(queue_visibility, sqs_msg, body) { |sqs_msg, body| ... }
click to toggle source
# File lib/toiler/actor/processor.rb, line 93 def visibility_extender(queue_visibility, sqs_msg, body) return unless auto_visibility_timeout? interval = [1, queue_visibility / 3].max Concurrent::TimerTask.execute execution_interval: interval, timeout_interval: interval do |task| begin sqs_msg.visibility_timeout = queue_visibility yield sqs_msg, body if block_given? rescue StandardError => e error "Processor #{queue} failed to extend visibility of message - #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" task.shutdown if e.message.include?('ReceiptHandle is invalid') end end end