class ActivePubsub::Subscriber

Attributes

connection[RW]
record[RW]

Instance Methods ###

Public Class Methods

as(service_namespace) click to toggle source

Class Methods ###

# File lib/active_pubsub/subscriber.rb, line 19
def self.as(service_namespace)
  self.local_service_namespace = service_namespace
end
bind_subscriptions!() click to toggle source
# File lib/active_pubsub/subscriber.rb, line 44
def self.bind_subscriptions!
  return if started?

  events.each_pair do |event_name, block|
    channel.queue(queue_for_event(event_name.to_s), queue_settings)
           .bind(exchange, :routing_key => routing_key_for_event(event_name))
           .subscribe(subscribe_settings) do |delivery_info, properties, payload|
      deserialized_event = deserialize_event(payload)
      deserialized_record = deserialize_record(deserialized_event[:record])

      subscriber_instance = new(deserialized_record)
      subscriber_instance.instance_exec(deserialized_record, &block)

      ::ActivePubsub.logger.info "#{delivery_info[:routing_key]} #{name} consumed #{deserialized_event}"

      channel.ack(delivery_info.delivery_tag) if ::ActivePubsub.config.ack
    end
  end

  self.started = true
end
channel() click to toggle source
# File lib/active_pubsub/subscriber.rb, line 28
def self.channel
  connection.channel
end
clear_connections!() click to toggle source
# File lib/active_pubsub/subscriber.rb, line 23
def self.clear_connections!
  channel.close
  connection.close
end
deserialize_event(event) click to toggle source
# File lib/active_pubsub/subscriber.rb, line 66
def self.deserialize_event(event)
  ::Marshal.load(event)
end
deserialize_record(record) click to toggle source
# File lib/active_pubsub/subscriber.rb, line 70
def self.deserialize_record(record)
  ::Marshal.load(record)
end
exchange() click to toggle source
# File lib/active_pubsub/subscriber.rb, line 32
def self.exchange
  channel.topic(exchange_name, exchange_settings)
end
inherited(klass) click to toggle source
# File lib/active_pubsub/subscriber.rb, line 36
def self.inherited(klass)
  klass.events = {}
end
new(record) click to toggle source
# File lib/active_pubsub/subscriber.rb, line 104
def initialize(record)
  @record = record
end
observes(target_exchange) click to toggle source
# File lib/active_pubsub/subscriber.rb, line 74
def self.observes(target_exchange)
  self.exchange_name = target_exchange
end
on(event_name, &block) click to toggle source
# File lib/active_pubsub/subscriber.rb, line 40
def self.on(event_name, &block)
  events[event_name] = block
end
print_subscriptions!() click to toggle source
queue_for_event(event_name) click to toggle source
# File lib/active_pubsub/subscriber.rb, line 78
def self.queue_for_event(event_name)
  [local_service_namespace, exchange_name, event_name].compact.join('.')
end
routing_key_for_event(event_name) click to toggle source
# File lib/active_pubsub/subscriber.rb, line 82
def self.routing_key_for_event(event_name)
  [exchange_name, event_name].join(".")
end
started?() click to toggle source
# File lib/active_pubsub/subscriber.rb, line 97
def self.started?
  self.started
end