class Tochtli::BaseController::Dispatcher
Attributes
cache[R]
controller_class[R]
logger[R]
rabbit_connection[R]
Public Class Methods
new(controller_class, rabbit_connection, cache, logger)
click to toggle source
# File lib/tochtli/base_controller.rb, line 202 def initialize(controller_class, rabbit_connection, cache, logger) @controller_class = controller_class @rabbit_connection = rabbit_connection @cache = cache @logger = logger @application = Tochtli.application.to_app @queues = {} @process_counter = ProcessCounter.new @initial_env = nil end
Public Instance Methods
process_message(delivery_info, properties, payload, initial_env)
click to toggle source
# File lib/tochtli/base_controller.rb, line 227 def process_message(delivery_info, properties, payload, initial_env) register_process_start env = initial_env.merge( delivery_info: delivery_info, properties: properties, payload: payload, controller_class: controller_class, rabbit_connection: rabbit_connection, cache: cache, logger: logger ) @application.call(env) rescue Exception => ex logger.error "\nUNEXPECTED EXCEPTION: #{ex.class.name} (#{ex.message})" logger.error ex.backtrace.join("\n") false ensure register_process_end end
queues()
click to toggle source
# File lib/tochtli/base_controller.rb, line 286 def queues @queues.map { |_, qh| qh[:queue] } end
restart(options={})
click to toggle source
# File lib/tochtli/base_controller.rb, line 217 def restart(options={}) queues = @queues.dup shutdown options queues.each do |queue_name, queue_opts| start queue_name, queue_opts[:initial_env] end end
shutdown(options={})
click to toggle source
Performs a graceful shutdown of dispatcher i.e. waits for all processes to end. If timeout is reached, forces the shutdown. Useful with dynamic reconfiguration of work pool size.
# File lib/tochtli/base_controller.rb, line 265 def shutdown(options={}) wait_for_processes options.fetch(:timeout, 15) stop end
start(queue_name, routing_keys, initial_env={})
click to toggle source
# File lib/tochtli/base_controller.rb, line 213 def start(queue_name, routing_keys, initial_env={}) subscribe_queue(queue_name, routing_keys, initial_env) end
started?(queue_name=nil)
click to toggle source
# File lib/tochtli/base_controller.rb, line 278 def started?(queue_name=nil) if queue_name @queues.has_key?(queue_name) else !@queues.empty? end end
stop(queues=nil)
click to toggle source
# File lib/tochtli/base_controller.rb, line 270 def stop(queues=nil) @queues.each_value { |queue_opts| queue_opts[:consumer].cancel } rescue Bunny::ConnectionClosedError # ignore closed connection error ensure @queues = {} end
subscribe_queue(queue_name, routing_keys, initial_env={})
click to toggle source
# File lib/tochtli/base_controller.rb, line 250 def subscribe_queue(queue_name, routing_keys, initial_env={}) queue = controller_class.create_queue(@rabbit_connection, queue_name, routing_keys) consumer = queue.subscribe do |delivery_info, metadata, payload| process_message delivery_info, metadata, payload, initial_env end @queues[queue_name] = { queue: queue, consumer: consumer, initial_env: initial_env } end
Protected Instance Methods
register_process_end()
click to toggle source
# File lib/tochtli/base_controller.rb, line 296 def register_process_end @process_counter.decrement end
register_process_start()
click to toggle source
# File lib/tochtli/base_controller.rb, line 292 def register_process_start @process_counter.increment end
wait_for_processes(timeout)
click to toggle source
# File lib/tochtli/base_controller.rb, line 300 def wait_for_processes(timeout) @process_counter.wait(timeout) end