class Tochtli::BaseController
Attributes
delivery_info[R]
env[R]
logger[R]
message[R]
Public Class Methods
bind(*routing_keys)
click to toggle source
# File lib/tochtli/base_controller.rb, line 54 def bind(*routing_keys) self.routing_keys.merge(routing_keys) end
create_queue(rabbit_connection, queue_name=nil, routing_keys=nil)
click to toggle source
# File lib/tochtli/base_controller.rb, line 131 def create_queue(rabbit_connection, queue_name=nil, routing_keys=nil) queue_name = self.queue_name unless queue_name routing_keys = self.routing_keys unless routing_keys channel = rabbit_connection.create_channel(self.work_pool_size) exchange_name = self.exchange_name || rabbit_connection.exchange_name exchange = channel.send(self.exchange_type, exchange_name, durable: self.exchange_durable) queue = channel.queue(queue_name, durable: self.queue_durable, exclusive: self.queue_exclusive, auto_delete: self.queue_auto_delete) routing_keys.each do |routing_key| queue.bind(exchange, routing_key: routing_key) end queue end
find_message_route(routing_key)
click to toggle source
# File lib/tochtli/base_controller.rb, line 126 def find_message_route(routing_key) raise "Routing not set up" if self.message_handlers.empty? self.message_handlers.find {|handler| handler.pattern =~ routing_key } end
inherited(controller)
click to toggle source
# File lib/tochtli/base_controller.rb, line 47 def inherited(controller) controller.routing_keys = Set.new controller.message_handlers = Array.new controller.queue_name = controller.name.underscore.gsub('::', '/') ControllerManager.register(controller) end
new(rabbit_connection, cache, logger)
click to toggle source
# File lib/tochtli/base_controller.rb, line 152 def initialize(rabbit_connection, cache, logger) @rabbit_connection = rabbit_connection @cache = cache @logger = logger end
off(routing_key)
click to toggle source
# File lib/tochtli/base_controller.rb, line 74 def off(routing_key) self.message_handlers.delete_if {|route| route.routing_key == routing_key } end
on(message_class, method_name=nil, opts={}, &block)
click to toggle source
# File lib/tochtli/base_controller.rb, line 58 def on(message_class, method_name=nil, opts={}, &block) if method_name.is_a?(Hash) opts = method_name method_name = nil end method = method_name ? method_name : block raise ArgumentError, "Method name or block must be given" unless method raise ArgumentError, "Message class expected, got: #{message_class}" unless message_class < Tochtli::Message routing_key = opts[:routing_key] || message_class.routing_key raise "Topic not set for message: #{message_class}" unless routing_key self.message_handlers << MessageRoute.new(message_class, method, routing_key) end
restart(options={})
click to toggle source
# File lib/tochtli/base_controller.rb, line 117 def restart(options={}) if started? queues = self.dispatcher.queues run_hook :before_restart, queues self.dispatcher.restart options run_hook :after_restart, queues end end
set_up?()
click to toggle source
# File lib/tochtli/base_controller.rb, line 91 def set_up? !!self.dispatcher end
setup(rabbit_connection, cache=nil, logger=nil)
click to toggle source
# File lib/tochtli/base_controller.rb, line 79 def setup(rabbit_connection, cache=nil, logger=nil) run_hook :before_setup, rabbit_connection self.dispatcher = Dispatcher.new(self, rabbit_connection, cache, logger || Tochtli.logger) run_hook :after_setup, rabbit_connection end
start(queue_name=nil, routing_keys=nil, initial_env={})
click to toggle source
# File lib/tochtli/base_controller.rb, line 85 def start(queue_name=nil, routing_keys=nil, initial_env={}) run_hook :before_start, queue_name, initial_env self.dispatcher.start(queue_name || self.queue_name, routing_keys || self.routing_keys, initial_env) run_hook :after_start, queue_name, initial_env end
started?(queue_name=nil)
click to toggle source
# File lib/tochtli/base_controller.rb, line 95 def started?(queue_name=nil) self.dispatcher && self.dispatcher.started?(queue_name) end
stop(options={})
click to toggle source
# File lib/tochtli/base_controller.rb, line 99 def stop(options={}) if started? queues = self.dispatcher.queues run_hook :before_stop, queues end if self.dispatcher self.dispatcher.shutdown(options) self.dispatcher = nil run_hook :after_stop, queues true else false end end
Public Instance Methods
process_message(env)
click to toggle source
# File lib/tochtli/base_controller.rb, line 158 def process_message(env) @env = env @action = env[:action] @message = env[:message] @delivery_info = env[:delivery_info] if @action.is_a?(Proc) instance_eval(&@action) else send @action end ensure @env, @message, @delivery_info = nil end
rabbit_connection()
click to toggle source
# File lib/tochtli/base_controller.rb, line 189 def rabbit_connection self.class.dispatcher.rabbit_connection if self.class.set_up? end
reply(reply_message, reply_to=nil, message_id=nil)
click to toggle source
# File lib/tochtli/base_controller.rb, line 174 def reply(reply_message, reply_to=nil, message_id=nil) if @message reply_to ||= @message.properties.reply_to message_id ||= @message.id end raise "The 'reply_to' queue name is not specified" unless reply_to logger.debug "\tSending reply on #{message_id} to #{reply_to}: #{reply_message.inspect}." @rabbit_connection.publish(reply_to, reply_message, correlation_id: message_id) end