class Toiler::Actor::Processor

Responsible for processing sqs messages and notifying Fetcher when done

Attributes

body_parser[RW]
executing[RW]
extend_callback[RW]
fetcher[RW]
queue[RW]
thread[RW]
worker_class[RW]

Public Class Methods

new(queue) click to toggle source
# File lib/toiler/actor/processor.rb, line 13
def initialize(queue)
  @queue = queue
  @worker_class = Toiler.worker_class_registry[queue]
  @executing = false
  @thread = nil
  init_options
end

Public Instance Methods

default_executor() click to toggle source
# File lib/toiler/actor/processor.rb, line 21
def default_executor
  Concurrent.global_io_executor
end
executing?() click to toggle source
# File lib/toiler/actor/processor.rb, line 37
def executing?
  @executing
end
on_message(msg) click to toggle source
# File lib/toiler/actor/processor.rb, line 29
def on_message(msg)
  method, *args = msg
  send(method, *args)
rescue StandardError, SystemStackError => e
  # rescue SystemStackError, if clients misbehave and cause a stack level too deep exception, we should be able to recover
  error "Processor #{queue} failed processing, reason: #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
end

Private Instance Methods

auto_delete?() click to toggle source
# File lib/toiler/actor/processor.rb, line 55
def auto_delete?
  @auto_delete
end
auto_visibility_timeout?() click to toggle source
# File lib/toiler/actor/processor.rb, line 51
def auto_visibility_timeout?
  @auto_visibility_timeout
end
get_body(sqs_msg) click to toggle source
# File lib/toiler/actor/processor.rb, line 109
def get_body(sqs_msg)
  if sqs_msg.is_a? Array
    sqs_msg.map { |m| parse_body m }
  else
    parse_body sqs_msg
  end
end
init_options() click to toggle source
# File lib/toiler/actor/processor.rb, line 43
def init_options
  @auto_visibility_timeout = @worker_class.auto_visibility_timeout?
  @auto_delete = @worker_class.auto_delete?
  toiler_options = @worker_class.toiler_options
  @body_parser = toiler_options[:parser]
  @extend_callback = toiler_options[:on_visibility_extend]
end
parse_body(sqs_msg) click to toggle source
# File lib/toiler/actor/processor.rb, line 117
def parse_body(sqs_msg)
  case body_parser
  when :json then JSON.parse sqs_msg.body
  when Proc then body_parser.call sqs_msg
  when :text, nil then sqs_msg.body
  else body_parser.load sqs_msg.body
  end
rescue StandardError => e
  raise "Error parsing the message body: #{e.message}"
end
process(visibility, sqs_msg) click to toggle source
# File lib/toiler/actor/processor.rb, line 59
def process(visibility, sqs_msg)
  process_init
  worker = @worker_class.new
  body = get_body(sqs_msg)
  timer = visibility_extender visibility, sqs_msg, body, &extend_callback

  debug "Worker #{queue} starts performing..."
  worker.perform sqs_msg, body
  debug "Worker #{queue} finishes performing..."
  sqs_msg.delete if auto_delete?
ensure
  process_cleanup timer
end
process_cleanup(timer) click to toggle source
# File lib/toiler/actor/processor.rb, line 79
def process_cleanup(timer)
  debug "Processor #{queue} starts cleanup after perform..."
  timer.shutdown if timer
  ::ActiveRecord::Base.clear_active_connections! if defined? ActiveRecord
  processor_finished
  @executing = false
  @thread = nil
  debug "Processor #{queue} finished cleanup after perform..."
end
process_init() click to toggle source
# File lib/toiler/actor/processor.rb, line 73
def process_init
  @executing = true
  @thread = Thread.current
  debug "Processor #{queue} begins processing..."
end
processor_finished() click to toggle source
# File lib/toiler/actor/processor.rb, line 89
def processor_finished
  fetcher.tell :processor_finished
end
visibility_extender(queue_visibility, sqs_msg, body) { |sqs_msg, body| ... } click to toggle source
# File lib/toiler/actor/processor.rb, line 93
def visibility_extender(queue_visibility, sqs_msg, body)
  return unless auto_visibility_timeout?

  interval = [1, queue_visibility / 3].max
  Concurrent::TimerTask.execute execution_interval: interval,
                                timeout_interval: interval do |task|
    begin
      sqs_msg.visibility_timeout = queue_visibility
      yield sqs_msg, body if block_given?
    rescue StandardError => e
      error "Processor #{queue} failed to extend visibility of message - #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
      task.shutdown if e.message.include?('ReceiptHandle is invalid')
    end
  end
end