class ActivePubsub::Subscriber
Attributes
connection[RW]
record[RW]
Instance Methods ###
Public Class Methods
as(service_namespace)
click to toggle source
Class Methods ###
# File lib/active_pubsub/subscriber.rb, line 19 def self.as(service_namespace) self.local_service_namespace = service_namespace end
bind_subscriptions!()
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 44 def self.bind_subscriptions! return if started? events.each_pair do |event_name, block| channel.queue(queue_for_event(event_name.to_s), queue_settings) .bind(exchange, :routing_key => routing_key_for_event(event_name)) .subscribe(subscribe_settings) do |delivery_info, properties, payload| deserialized_event = deserialize_event(payload) deserialized_record = deserialize_record(deserialized_event[:record]) subscriber_instance = new(deserialized_record) subscriber_instance.instance_exec(deserialized_record, &block) ::ActivePubsub.logger.info "#{delivery_info[:routing_key]} #{name} consumed #{deserialized_event}" channel.ack(delivery_info.delivery_tag) if ::ActivePubsub.config.ack end end self.started = true end
channel()
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 28 def self.channel connection.channel end
clear_connections!()
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 23 def self.clear_connections! channel.close connection.close end
deserialize_event(event)
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 66 def self.deserialize_event(event) ::Marshal.load(event) end
deserialize_record(record)
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 70 def self.deserialize_record(record) ::Marshal.load(record) end
exchange()
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 32 def self.exchange channel.topic(exchange_name, exchange_settings) end
inherited(klass)
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 36 def self.inherited(klass) klass.events = {} end
new(record)
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 104 def initialize(record) @record = record end
observes(target_exchange)
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 74 def self.observes(target_exchange) self.exchange_name = target_exchange end
on(event_name, &block)
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 40 def self.on(event_name, &block) events[event_name] = block end
print_subscriptions!()
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 86 def self.print_subscriptions! message = "Watching: \n" events.each_pair do |event_name, block| message << "Queue: #{queue_for_event(event_name.to_s)} \n" << "Routing Key: #{routing_key_for_event(event_name)} \n" << "\n" end ::ActivePubsub.logger.info(message) end
queue_for_event(event_name)
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 78 def self.queue_for_event(event_name) [local_service_namespace, exchange_name, event_name].compact.join('.') end
routing_key_for_event(event_name)
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 82 def self.routing_key_for_event(event_name) [exchange_name, event_name].join(".") end
started?()
click to toggle source
# File lib/active_pubsub/subscriber.rb, line 97 def self.started? self.started end