class Firehose::Server::Channel
Connects to a specific channel on Redis
and listens for messages to notify subscribers.
Attributes
channel_key[R]
list_key[R]
redis[R]
sequence_key[R]
subscriber[R]
Public Class Methods
new(channel_key, redis=self.class.redis, subscriber=self.class.subscriber)
click to toggle source
# File lib/firehose/server/channel.rb, line 16 def initialize(channel_key, redis=self.class.redis, subscriber=self.class.subscriber) @redis = redis @subscriber = subscriber @channel_key = channel_key @list_key = Server::Redis.key(channel_key, :list) @sequence_key = Server::Redis.key(channel_key, :sequence) end
redis()
click to toggle source
# File lib/firehose/server/channel.rb, line 8 def self.redis @redis ||= Firehose::Server.redis.connection end
subscriber()
click to toggle source
# File lib/firehose/server/channel.rb, line 12 def self.subscriber @subscriber ||= Server::Subscriber.new(Firehose::Server.redis.connection) end
Public Instance Methods
next_messages(consumer_sequence=nil, options={})
click to toggle source
# File lib/firehose/server/channel.rb, line 24 def next_messages(consumer_sequence=nil, options={}) deferrable = EM::DefaultDeferrable.new deferrable.errback {|e| EM.next_tick { raise e } unless [:timeout, :disconnect].include?(e) } redis.multi redis.get(sequence_key). errback {|e| deferrable.fail e } # Fetch entire list: http://stackoverflow.com/questions/10703019/redis-fetch-all-value-of-list-without-iteration-and-without-popping redis.lrange(list_key, 0, -1). errback {|e| deferrable.fail e } redis.exec.callback do |(channel_sequence, message_list)| # Reverse the messages so they can be correctly procesed by the MessageBuffer class. There's # a patch in the message-buffer-redis branch that moves this concern into the Publisher LUA # script. We kept it out of this for now because it represents a deployment risk and `reverse!` # is a cheap operation in Ruby. message_list.reverse! buffer = MessageBuffer.new(message_list, channel_sequence, consumer_sequence) if buffer.remaining_messages.empty? Firehose.logger.debug "No messages in buffer, subscribing. sequence: `#{channel_sequence}` consumer_sequence: #{consumer_sequence}" # Either this resource has never been seen before or we are all caught up. # Subscribe and hope something gets published to this end-point. subscribe(deferrable, options[:timeout]) else # Either the client is under water or caught up to head. deferrable.succeed buffer.remaining_messages end end.errback {|e| deferrable.fail e } deferrable end
unsubscribe(deferrable)
click to toggle source
# File lib/firehose/server/channel.rb, line 54 def unsubscribe(deferrable) subscriber.unsubscribe channel_key, deferrable end
Private Instance Methods
subscribe(deferrable, timeout=nil)
click to toggle source
# File lib/firehose/server/channel.rb, line 59 def subscribe(deferrable, timeout=nil) subscriber.subscribe(channel_key, deferrable) if timeout timer = EventMachine::Timer.new(timeout) do deferrable.fail :timeout unsubscribe deferrable end # Cancel the timer if when the deferrable succeeds deferrable.callback { timer.cancel } end end