class Roundhouse::Processor

The Processor receives a message from the Manager and actually processes it. It instantiates the worker, runs the middleware chain and then calls Roundhouse::Worker#perform.

Constants

STATS_TIMEOUT

To prevent a memory leak, ensure that stats expire. However, they should take up a minimal amount of storage so keep them around for a long time

Attributes

proxy_id[RW]

Public Class Methods

default_middleware() click to toggle source
# File lib/roundhouse/processor.rb, line 21
def self.default_middleware
  Middleware::Chain.new do |m|
    m.add Middleware::Server::Logging
    m.add Middleware::Server::RetryJobs
    if defined?(::ActiveRecord::Base)
      require 'roundhouse/middleware/server/active_record'
      m.add Roundhouse::Middleware::Server::ActiveRecord
    end
  end
end
new(boss) click to toggle source
# File lib/roundhouse/processor.rb, line 34
def initialize(boss)
  @boss = boss
end

Public Instance Methods

execute_job(worker, cloned_args) click to toggle source
# File lib/roundhouse/processor.rb, line 79
def execute_job(worker, cloned_args)
  worker.perform(*cloned_args)
end
inspect() click to toggle source
# File lib/roundhouse/processor.rb, line 75
def inspect
  "<Processor##{object_id.to_s(16)}>"
end
process(work) click to toggle source
# File lib/roundhouse/processor.rb, line 38
def process(work)
  msgstr = work.message
  queue = work.queue_id

  @boss.async.real_thread(proxy_id, Thread.current)

  ack = false
  begin
    msg = Roundhouse.load_json(msgstr)
    klass  = msg['class'.freeze].constantize
    worker = klass.new
    worker.jid = msg['jid'.freeze]

    stats(worker, msg, queue) do
      Roundhouse.server_middleware.invoke(worker, msg, queue) do
        ack = true
        execute_job(worker, cloned(msg['args'.freeze]))
      end
    end
    ack = true
  rescue Roundhouse::Shutdown
    # Had to force kill this job because it didn't finish
    # within the timeout.  Don't acknowledge the work since
    # we didn't properly finish it.
    ack = false
  rescue Exception => ex
    handle_exception(ex, msg || { :message => msgstr })
    raise
  ensure
    # Put the queue back into rotation
    Roundhouse.redis { |conn| Roundhouse::Monitor.push(conn, queue) }
    work.acknowledge if ack
  end

  @boss.async.processor_done(current_actor)
end

Private Instance Methods

cloned(ary) click to toggle source

Deep clone the arguments passed to the worker so that if the message fails, what is pushed back onto Redis hasn’t been mutated by the worker.

# File lib/roundhouse/processor.rb, line 136
def cloned(ary)
  Marshal.load(Marshal.dump(ary))
end
pause_for_recovery(retry_count) click to toggle source
# File lib/roundhouse/processor.rb, line 158
def pause_for_recovery(retry_count)
  sleep(retry_count)
end
retry_and_suppress_exceptions(max_retries = 5) { || ... } click to toggle source

If an exception occurs in the block passed to this method, that block will be retried up to max_retries times. All exceptions will be swallowed and logged.

# File lib/roundhouse/processor.rb, line 142
def retry_and_suppress_exceptions(max_retries = 5)
  retry_count = 0
  begin
    yield
  rescue => e
    retry_count += 1
    if retry_count <= max_retries
      Roundhouse.logger.debug {"Suppressing and retrying error: #{e.inspect}"}
      pause_for_recovery(retry_count)
      retry
    else
      handle_exception(e, { :message => "Exhausted #{max_retries} retries"})
    end
  end
end
stats(worker, msg, queue) { || ... } click to toggle source
# File lib/roundhouse/processor.rb, line 89
def stats(worker, msg, queue)
  # Do not conflate errors from the job with errors caused by updating
  # stats so calling code can react appropriately
  retry_and_suppress_exceptions do
    hash = Roundhouse.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i })
    Roundhouse.redis do |conn|
      conn.multi do
        conn.hmset("#{identity}:workers", thread_identity, hash)
        conn.expire("#{identity}:workers", 60*60*4)
      end
    end
  end

  nowdate = Time.now.utc.strftime("%Y-%m-%d".freeze)
  begin
    yield
  rescue Exception
    retry_and_suppress_exceptions do
      failed = "stat:failed:#{Time.now.utc.to_date}"
      failed = "stat:failed:#{nowdate}"
      Roundhouse.redis do |conn|
        conn.multi do
          conn.incrby("stat:failed".freeze, 1)
          conn.incrby(failed, 1)
          conn.expire(failed, STATS_TIMEOUT)
        end
      end
    end
    raise
  ensure
    retry_and_suppress_exceptions do
      processed = "stat:processed:#{nowdate}"
      Roundhouse.redis do |conn|
        conn.multi do
          conn.hdel("#{identity}:workers", thread_identity)
          conn.incrby("stat:processed".freeze, 1)
          conn.incrby(processed, 1)
          conn.expire(processed, STATS_TIMEOUT)
        end
      end
    end
  end
end
thread_identity() click to toggle source
# File lib/roundhouse/processor.rb, line 85
def thread_identity
  @str ||= Thread.current.object_id.to_s(36)
end