class DispatchRider::Demultiplexer

Attributes

dispatcher[R]
error_handler[R]
queue[R]

Public Class Methods

new(queue, dispatcher, error_handler) click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 12
def initialize(queue, dispatcher, error_handler)
  @queue = queue
  @dispatcher = dispatcher
  @error_handler = error_handler
  @continue = true
  @current_message = nil
end

Public Instance Methods

start() click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 20
def start
  do_loop do

    sleep 1
    handle_next_queue_item
  rescue => exception
    error_handler.call(Message.new(subject: "TopLevelError", body: {}), exception)
    throw :done

  end
  self
end
stop(reason: nil) click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 33
def stop(reason: nil)
  @continue = false
  Logging::LifecycleLogger.log_got_stop reason, @current_message if @current_message
end

Private Instance Methods

dispatch_message(message) click to toggle source

This needs to return true/false based on the success of the jobs!

# File lib/dispatch-rider/demultiplexer.rb, line 48
def dispatch_message(message)
  with_current_message(message) do
    dispatcher.dispatch(message)
  end
rescue => exception
  handle_message_error message, exception
  false
end
do_loop() { || ... } click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 57
def do_loop
  catch(:done) do
    while keep_going?
      throw :done unless @continue
      yield
    end
  end
end
handle_message_error(message, exception) click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 76
def handle_message_error(message, exception)
  error_handler.call(message, exception)
rescue => error_handler_exception # the error handler crashed
  Logging::LifecycleLogger.log_error_handler_fail message, error_handler_exception
  raise error_handler_exception
end
handle_next_queue_item() click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 70
def handle_next_queue_item
  queue.pop do |message|
    dispatch_message(message)
  end
end
keep_going?() click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 66
def keep_going?
  true
end
logger() click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 83
def logger
  DispatchRider.config.logger
end
with_current_message(message) { || ... } click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 40
def with_current_message(message)
  @current_message = message
  yield
ensure
  @current_message = nil
end