class DispatchRider::Demultiplexer

Attributes

dispatcher[R]
error_handler[R]
queue[R]

Public Class Methods

new(queue, dispatcher, error_handler) click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 10
def initialize(queue, dispatcher, error_handler)
  @queue = queue
  @dispatcher = dispatcher
  @error_handler = error_handler
  @continue = true
  @current_message = nil
end

Public Instance Methods

start() click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 18
def start
  do_loop do
    begin
      sleep 1
      handle_next_queue_item
    rescue => exception
      error_handler.call(Message.new(subject: "TopLevelError", body: {}), exception)
      throw :done
    end
  end
  self
end
stop(reason: nil) click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 31
def stop(reason: nil)
  @continue = false
  Logging::LifecycleLogger.log_got_stop reason, @current_message if @current_message
end

Private Instance Methods

dispatch_message(message) click to toggle source

This needs to return true/false based on the success of the jobs!

# File lib/dispatch-rider/demultiplexer.rb, line 48
def dispatch_message(message)
  with_current_message(message) do
    dispatcher.dispatch(message)
  end
rescue => exception
  handle_message_error message, exception
  false
end
do_loop() { || ... } click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 57
def do_loop
  catch(:done) do
    while keep_going?
      throw :done unless @continue
      yield
    end
  end
end
handle_message_error(message, exception) click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 76
def handle_message_error(message, exception)
  begin
    error_handler.call(message, exception)
  rescue => error_handler_exception # the error handler crashed
    Logging::LifecycleLogger.log_error_handler_fail message, error_handler_exception
    raise error_handler_exception
  end
end
handle_next_queue_item() click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 70
def handle_next_queue_item
  queue.pop do |message|
    dispatch_message(message)
  end
end
keep_going?() click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 66
def keep_going?
  true
end
logger() click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 85
def logger
  DispatchRider.config.logger
end
with_current_message(message) { || ... } click to toggle source
# File lib/dispatch-rider/demultiplexer.rb, line 38
def with_current_message(message)
  begin
    @current_message = message
    yield
  ensure
    @current_message = nil
  end
end