class Firehose::Server::Channel

Connects to a specific channel on Redis and listens for messages to notify subscribers.

Attributes

channel_key[R]
list_key[R]
redis[R]
sequence_key[R]
subscriber[R]

Public Class Methods

new(channel_key, redis=self.class.redis, subscriber=self.class.subscriber) click to toggle source
# File lib/firehose/server/channel.rb, line 16
def initialize(channel_key, redis=self.class.redis, subscriber=self.class.subscriber)
  @redis        = redis
  @subscriber   = subscriber
  @channel_key  = channel_key
  @list_key     = Server::Redis.key(channel_key, :list)
  @sequence_key = Server::Redis.key(channel_key, :sequence)
end
redis() click to toggle source
# File lib/firehose/server/channel.rb, line 8
def self.redis
  @redis ||= Firehose::Server.redis.connection
end
subscriber() click to toggle source
# File lib/firehose/server/channel.rb, line 12
def self.subscriber
  @subscriber ||= Server::Subscriber.new(Firehose::Server.redis.connection)
end

Public Instance Methods

next_messages(consumer_sequence=nil, options={}) click to toggle source
# File lib/firehose/server/channel.rb, line 24
def next_messages(consumer_sequence=nil, options={})
  deferrable = EM::DefaultDeferrable.new
  deferrable.errback {|e| EM.next_tick { raise e } unless [:timeout, :disconnect].include?(e) }

  redis.multi
    redis.get(sequence_key).
      errback {|e| deferrable.fail e }
    # Fetch entire list: http://stackoverflow.com/questions/10703019/redis-fetch-all-value-of-list-without-iteration-and-without-popping
    redis.lrange(list_key, 0, -1).
      errback {|e| deferrable.fail e }
  redis.exec.callback do |(channel_sequence, message_list)|
    # Reverse the messages so they can be correctly procesed by the MessageBuffer class. There's
    # a patch in the message-buffer-redis branch that moves this concern into the Publisher LUA
    # script. We kept it out of this for now because it represents a deployment risk and `reverse!`
    # is a cheap operation in Ruby.
    message_list.reverse!
    buffer = MessageBuffer.new(message_list, channel_sequence, consumer_sequence)
    if buffer.remaining_messages.empty?
      Firehose.logger.debug "No messages in buffer, subscribing. sequence: `#{channel_sequence}` consumer_sequence: #{consumer_sequence}"
      # Either this resource has never been seen before or we are all caught up.
      # Subscribe and hope something gets published to this end-point.
      subscribe(deferrable, options[:timeout])
    else # Either the client is under water or caught up to head.
      deferrable.succeed buffer.remaining_messages
    end
  end.errback {|e| deferrable.fail e }

  deferrable
end
unsubscribe(deferrable) click to toggle source
# File lib/firehose/server/channel.rb, line 54
def unsubscribe(deferrable)
  subscriber.unsubscribe channel_key, deferrable
end

Private Instance Methods

subscribe(deferrable, timeout=nil) click to toggle source
# File lib/firehose/server/channel.rb, line 59
def subscribe(deferrable, timeout=nil)
  subscriber.subscribe(channel_key, deferrable)
  if timeout
    timer = EventMachine::Timer.new(timeout) do
      deferrable.fail :timeout
      unsubscribe deferrable
    end
    # Cancel the timer if when the deferrable succeeds
    deferrable.callback { timer.cancel }
  end
end