class Roundhouse::Processor
The Processor
receives a message from the Manager
and actually processes it. It instantiates the worker, runs the middleware chain and then calls Roundhouse::Worker#perform.
Constants
- STATS_TIMEOUT
To prevent a memory leak, ensure that stats expire. However, they should take up a minimal amount of storage so keep them around for a long time
Attributes
proxy_id[RW]
Public Class Methods
default_middleware()
click to toggle source
# File lib/roundhouse/processor.rb, line 21 def self.default_middleware Middleware::Chain.new do |m| m.add Middleware::Server::Logging m.add Middleware::Server::RetryJobs if defined?(::ActiveRecord::Base) require 'roundhouse/middleware/server/active_record' m.add Roundhouse::Middleware::Server::ActiveRecord end end end
new(boss)
click to toggle source
# File lib/roundhouse/processor.rb, line 34 def initialize(boss) @boss = boss end
Public Instance Methods
execute_job(worker, cloned_args)
click to toggle source
# File lib/roundhouse/processor.rb, line 79 def execute_job(worker, cloned_args) worker.perform(*cloned_args) end
inspect()
click to toggle source
# File lib/roundhouse/processor.rb, line 75 def inspect "<Processor##{object_id.to_s(16)}>" end
process(work)
click to toggle source
# File lib/roundhouse/processor.rb, line 38 def process(work) msgstr = work.message queue = work.queue_id @boss.async.real_thread(proxy_id, Thread.current) ack = false begin msg = Roundhouse.load_json(msgstr) klass = msg['class'.freeze].constantize worker = klass.new worker.jid = msg['jid'.freeze] stats(worker, msg, queue) do Roundhouse.server_middleware.invoke(worker, msg, queue) do ack = true execute_job(worker, cloned(msg['args'.freeze])) end end ack = true rescue Roundhouse::Shutdown # Had to force kill this job because it didn't finish # within the timeout. Don't acknowledge the work since # we didn't properly finish it. ack = false rescue Exception => ex handle_exception(ex, msg || { :message => msgstr }) raise ensure # Put the queue back into rotation Roundhouse.redis { |conn| Roundhouse::Monitor.push(conn, queue) } work.acknowledge if ack end @boss.async.processor_done(current_actor) end
Private Instance Methods
cloned(ary)
click to toggle source
Deep clone the arguments passed to the worker so that if the message fails, what is pushed back onto Redis hasn’t been mutated by the worker.
# File lib/roundhouse/processor.rb, line 136 def cloned(ary) Marshal.load(Marshal.dump(ary)) end
pause_for_recovery(retry_count)
click to toggle source
# File lib/roundhouse/processor.rb, line 158 def pause_for_recovery(retry_count) sleep(retry_count) end
retry_and_suppress_exceptions(max_retries = 5) { || ... }
click to toggle source
If an exception occurs in the block passed to this method, that block will be retried up to max_retries times. All exceptions will be swallowed and logged.
# File lib/roundhouse/processor.rb, line 142 def retry_and_suppress_exceptions(max_retries = 5) retry_count = 0 begin yield rescue => e retry_count += 1 if retry_count <= max_retries Roundhouse.logger.debug {"Suppressing and retrying error: #{e.inspect}"} pause_for_recovery(retry_count) retry else handle_exception(e, { :message => "Exhausted #{max_retries} retries"}) end end end
stats(worker, msg, queue) { || ... }
click to toggle source
# File lib/roundhouse/processor.rb, line 89 def stats(worker, msg, queue) # Do not conflate errors from the job with errors caused by updating # stats so calling code can react appropriately retry_and_suppress_exceptions do hash = Roundhouse.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i }) Roundhouse.redis do |conn| conn.multi do conn.hmset("#{identity}:workers", thread_identity, hash) conn.expire("#{identity}:workers", 60*60*4) end end end nowdate = Time.now.utc.strftime("%Y-%m-%d".freeze) begin yield rescue Exception retry_and_suppress_exceptions do failed = "stat:failed:#{Time.now.utc.to_date}" failed = "stat:failed:#{nowdate}" Roundhouse.redis do |conn| conn.multi do conn.incrby("stat:failed".freeze, 1) conn.incrby(failed, 1) conn.expire(failed, STATS_TIMEOUT) end end end raise ensure retry_and_suppress_exceptions do processed = "stat:processed:#{nowdate}" Roundhouse.redis do |conn| conn.multi do conn.hdel("#{identity}:workers", thread_identity) conn.incrby("stat:processed".freeze, 1) conn.incrby(processed, 1) conn.expire(processed, STATS_TIMEOUT) end end end end end
thread_identity()
click to toggle source
# File lib/roundhouse/processor.rb, line 85 def thread_identity @str ||= Thread.current.object_id.to_s(36) end