class Soda::Processor

Attributes

job_logger[R]
manager[R]
retrier[R]
thread[R]

Public Class Methods

middleware() click to toggle source
# File lib/soda/processor.rb, line 6
def middleware
  Soda.server_middleware
end
new(manager) click to toggle source
# File lib/soda/processor.rb, line 11
def initialize(manager)
  @manager      = manager
  @retrier      = Retrier.new
  @job_logger   = Soda::Logger::JobLogger.new
  @stopped      = false
end

Public Instance Methods

finish() click to toggle source
# File lib/soda/processor.rb, line 26
def finish
  thread.join
end
start() click to toggle source
# File lib/soda/processor.rb, line 18
def start
  @thread ||= Thread.new(&method(:run))
end
stop() click to toggle source
# File lib/soda/processor.rb, line 22
def stop
  @stopped = true
end

Private Instance Methods

constantize(str) click to toggle source
# File lib/soda/processor.rb, line 105
def constantize(str)
  Object.const_get(str)
end
execute(job_hash, msg) click to toggle source
# File lib/soda/processor.rb, line 74
def execute(job_hash, msg)
  queue   = msg.queue
  klass   = job_hash["klass"]
  worker  = constantize(klass)

  retrier.retry(job_hash, msg) do
    self.class.middleware.use(worker, job_hash, queue.name, msg) do
      instance = worker.new(job_hash)
      instance.perform(*job_hash["args"])
    end

    msg.acknowledge
  end
end
fetch() click to toggle source
# File lib/soda/processor.rb, line 47
def fetch
  fetcher = manager.fetcher
  fetcher.fetch
rescue => ex
  handle_exception(ex)

  raise
end
handle_exception(ex) click to toggle source

For now, don't do much - just log out the error TODO: make this more robust. Maybe support error handlers.

# File lib/soda/processor.rb, line 101
def handle_exception(ex)
  logger.error(ex)
end
parse_job(str) click to toggle source
# File lib/soda/processor.rb, line 89
def parse_job(str)
  Soda.load_json(str).tap do |job_hash|
    # ensure the JSON has an `args` and a `klass` value before considering
    # the message valid.
    job_hash.fetch("klass") && job_hash.fetch("args")
  end
rescue => ex
  nil
end
process(msg) click to toggle source
# File lib/soda/processor.rb, line 56
def process(msg)
  if (job_hash = parse_job(msg.str))
    job_logger.with(job_hash) do
      reloader.wrap do
        execute(job_hash, msg)
      end
    end
  else
    # We can't process the work because the JSON is invalid, so we have to
    # acknowledge the message (thus removing it) and move on.
    msg.acknowledge
  end
rescue => ex
  handle_exception(ex)

  raise
end
reloader() click to toggle source
# File lib/soda/processor.rb, line 115
def reloader
  @reloader ||=
    if defined?(::Soda::Rails)
      application = ::Rails.application
      application.reloader
    else
      StubReloader.new
    end
end
run() click to toggle source
# File lib/soda/processor.rb, line 38
def run
  until stopped?
    msgs = fetch || []
    msgs.each(&method(:process))
  end
rescue Exception => ex
  manager.on_died(self)
end
stopped?() click to toggle source
# File lib/soda/processor.rb, line 34
def stopped?
  @stopped
end