class Skein::Client::Subscriber

Public Class Methods

new(exchange, routing_key = nil, connection: nil, context: nil) click to toggle source

Instance Methods =====================================================

Calls superclass method Skein::Connected::new
# File lib/skein/client/subscriber.rb, line 4
def initialize(exchange, routing_key = nil, connection: nil, context: nil)
  super(connection: connection, context: context)

  @exchange =
    case (exchange)
    when String, Symbol
      self.channel.topic(exchange)
    else
      exchange
    end

  @subscribe_queue = self.channel.queue('', exclusive: true, durable: false)
  @subscribe_queue.bind(@exchange, routing_key: routing_key)
end

Public Instance Methods

close(delete_queue: false) click to toggle source
Calls superclass method Skein::Connected#close
# File lib/skein/client/subscriber.rb, line 40
def close(delete_queue: false)
  if (delete_queue)
    begin
      @subscribe_queue.delete
    rescue => e
      case (e.class.to_s)
      when 'Bunny::ChannelAlreadyClosed', 'MarchHare::ChannelAlreadyClosed'
        # Tried to delete, but this has already been shut down
      else
        raise e
      end
    end
  end

  super()
end
listen(block = true) { |load, delivery_info, properties| ... } click to toggle source
# File lib/skein/client/subscriber.rb, line 19
def listen(block = true)
  case (@subscribe_queue.class.to_s.split(/::/)[0])
  when 'Bunny'
    begin
      @subscribe_queue.subscribe(block: block) do |delivery_info, properties, payload|
        yield(JSON.load(payload), delivery_info, properties)
      end
    end
  when 'MarchHare'
    begin
      @subscribe_queue.subscribe(block: block) do |metadata, payload|
        yield(JSON.load(payload), metadata)
      end
    rescue MarchHare::ChannelAlreadyClosed
      # Connection got killed outside of thread, so shut-down and move on
    end
  else
    raise "Unknown queue type #{@subscribe_queue.class}, cannot listen."
  end
end