class Tochtli::BaseController::Dispatcher

Attributes

cache[R]
controller_class[R]
logger[R]
rabbit_connection[R]

Public Class Methods

new(controller_class, rabbit_connection, cache, logger) click to toggle source
# File lib/tochtli/base_controller.rb, line 202
def initialize(controller_class, rabbit_connection, cache, logger)
  @controller_class  = controller_class
  @rabbit_connection = rabbit_connection
  @cache             = cache
  @logger            = logger
  @application       = Tochtli.application.to_app
  @queues            = {}
  @process_counter   = ProcessCounter.new
  @initial_env       = nil
end

Public Instance Methods

process_message(delivery_info, properties, payload, initial_env) click to toggle source
# File lib/tochtli/base_controller.rb, line 227
def process_message(delivery_info, properties, payload, initial_env)
  register_process_start

  env = initial_env.merge(
      delivery_info:     delivery_info,
      properties:        properties,
      payload:           payload,
      controller_class:  controller_class,
      rabbit_connection: rabbit_connection,
      cache:             cache,
      logger:            logger
  )

  @application.call(env)

rescue Exception => ex
  logger.error "\nUNEXPECTED EXCEPTION: #{ex.class.name} (#{ex.message})"
  logger.error ex.backtrace.join("\n")
  false
ensure
  register_process_end
end
queues() click to toggle source
# File lib/tochtli/base_controller.rb, line 286
def queues
  @queues.map { |_, qh| qh[:queue] }
end
restart(options={}) click to toggle source
# File lib/tochtli/base_controller.rb, line 217
def restart(options={})
  queues = @queues.dup

  shutdown options

  queues.each do |queue_name, queue_opts|
    start queue_name, queue_opts[:initial_env]
  end
end
shutdown(options={}) click to toggle source

Performs a graceful shutdown of dispatcher i.e. waits for all processes to end. If timeout is reached, forces the shutdown. Useful with dynamic reconfiguration of work pool size.

# File lib/tochtli/base_controller.rb, line 265
def shutdown(options={})
  wait_for_processes options.fetch(:timeout, 15)
  stop
end
start(queue_name, routing_keys, initial_env={}) click to toggle source
# File lib/tochtli/base_controller.rb, line 213
def start(queue_name, routing_keys, initial_env={})
  subscribe_queue(queue_name, routing_keys, initial_env)
end
started?(queue_name=nil) click to toggle source
# File lib/tochtli/base_controller.rb, line 278
def started?(queue_name=nil)
  if queue_name
    @queues.has_key?(queue_name)
  else
    !@queues.empty?
  end
end
stop(queues=nil) click to toggle source
# File lib/tochtli/base_controller.rb, line 270
def stop(queues=nil)
  @queues.each_value { |queue_opts| queue_opts[:consumer].cancel }
rescue Bunny::ConnectionClosedError
  # ignore closed connection error
ensure
  @queues = {}
end
subscribe_queue(queue_name, routing_keys, initial_env={}) click to toggle source
# File lib/tochtli/base_controller.rb, line 250
def subscribe_queue(queue_name, routing_keys, initial_env={})
  queue    = controller_class.create_queue(@rabbit_connection, queue_name, routing_keys)
  consumer = queue.subscribe do |delivery_info, metadata, payload|
    process_message delivery_info, metadata, payload, initial_env
  end

  @queues[queue_name] = {
      queue:       queue,
      consumer:    consumer,
      initial_env: initial_env
  }
end

Protected Instance Methods

register_process_end() click to toggle source
# File lib/tochtli/base_controller.rb, line 296
def register_process_end
  @process_counter.decrement
end
register_process_start() click to toggle source
# File lib/tochtli/base_controller.rb, line 292
def register_process_start
  @process_counter.increment
end
wait_for_processes(timeout) click to toggle source
# File lib/tochtli/base_controller.rb, line 300
def wait_for_processes(timeout)
  @process_counter.wait(timeout)
end