class Soda::Processor
Attributes
job_logger[R]
manager[R]
retrier[R]
thread[R]
Public Class Methods
middleware()
click to toggle source
# File lib/soda/processor.rb, line 6 def middleware Soda.server_middleware end
new(manager)
click to toggle source
# File lib/soda/processor.rb, line 11 def initialize(manager) @manager = manager @retrier = Retrier.new @job_logger = Soda::Logger::JobLogger.new @stopped = false end
Public Instance Methods
finish()
click to toggle source
# File lib/soda/processor.rb, line 26 def finish thread.join end
start()
click to toggle source
# File lib/soda/processor.rb, line 18 def start @thread ||= Thread.new(&method(:run)) end
stop()
click to toggle source
# File lib/soda/processor.rb, line 22 def stop @stopped = true end
Private Instance Methods
constantize(str)
click to toggle source
# File lib/soda/processor.rb, line 105 def constantize(str) Object.const_get(str) end
execute(job_hash, msg)
click to toggle source
# File lib/soda/processor.rb, line 74 def execute(job_hash, msg) queue = msg.queue klass = job_hash["klass"] worker = constantize(klass) retrier.retry(job_hash, msg) do self.class.middleware.use(worker, job_hash, queue.name, msg) do instance = worker.new(job_hash) instance.perform(*job_hash["args"]) end msg.acknowledge end end
fetch()
click to toggle source
# File lib/soda/processor.rb, line 47 def fetch fetcher = manager.fetcher fetcher.fetch rescue => ex handle_exception(ex) raise end
handle_exception(ex)
click to toggle source
For now, don't do much - just log out the error TODO: make this more robust. Maybe support error handlers.
# File lib/soda/processor.rb, line 101 def handle_exception(ex) logger.error(ex) end
parse_job(str)
click to toggle source
# File lib/soda/processor.rb, line 89 def parse_job(str) Soda.load_json(str).tap do |job_hash| # ensure the JSON has an `args` and a `klass` value before considering # the message valid. job_hash.fetch("klass") && job_hash.fetch("args") end rescue => ex nil end
process(msg)
click to toggle source
# File lib/soda/processor.rb, line 56 def process(msg) if (job_hash = parse_job(msg.str)) job_logger.with(job_hash) do reloader.wrap do execute(job_hash, msg) end end else # We can't process the work because the JSON is invalid, so we have to # acknowledge the message (thus removing it) and move on. msg.acknowledge end rescue => ex handle_exception(ex) raise end
reloader()
click to toggle source
# File lib/soda/processor.rb, line 115 def reloader @reloader ||= if defined?(::Soda::Rails) application = ::Rails.application application.reloader else StubReloader.new end end
run()
click to toggle source
# File lib/soda/processor.rb, line 38 def run until stopped? msgs = fetch || [] msgs.each(&method(:process)) end rescue Exception => ex manager.on_died(self) end
stopped?()
click to toggle source
# File lib/soda/processor.rb, line 34 def stopped? @stopped end