class Firehose::Server::Subscriber
Setups a connetion to Redis
to listen for new resources…
Attributes
pubsub[R]
Public Class Methods
new(redis)
click to toggle source
# File lib/firehose/server/subscriber.rb, line 7 def initialize(redis) @pubsub = redis.pubsub # TODO: Instead of just raising an exception, it would probably be better # for the errback to set some sort of 'disconnected' state. Then # whenever a deferrable was 'subscribed' we could instantly fail # the deferrable with whatever connection error we had. # An alternative which would have a similar result would be to # subscribe lazily (i.e. not until we have a deferrable to subscribe). # Then, if connecting failed, it'd be super easy to fail the deferrable # with the same error. # The final goal is to allow the failed deferrable bubble back up # so we can send back a nice, clean 500 error to the client. channel_updates_key = Server::Redis.key('channel_updates') pubsub.subscribe(channel_updates_key). errback{|e| EM.next_tick { raise e } }. callback { Firehose.logger.debug "Redis subscribed to `#{channel_updates_key}`" } pubsub.on(:message) do |_, payload| channel_key, channel_sequence, message = Server::Publisher.from_payload(payload) messages = [ MessageBuffer::Message.new(message, channel_sequence.to_i) ] if deferrables = subscriptions.delete(channel_key) Firehose.logger.debug "Redis notifying #{deferrables.count} deferrable(s) at `#{channel_key}` with channel_sequence `#{channel_sequence}` and message `#{message}`" deferrables.each do |deferrable| Firehose.logger.debug "Sending message #{message} and channel_sequence #{channel_sequence} to client from subscriber" deferrable.succeed messages end end end end
Public Instance Methods
subscribe(channel_key, deferrable)
click to toggle source
# File lib/firehose/server/subscriber.rb, line 36 def subscribe(channel_key, deferrable) subscriptions[channel_key].push deferrable end
unsubscribe(channel_key, deferrable)
click to toggle source
# File lib/firehose/server/subscriber.rb, line 40 def unsubscribe(channel_key, deferrable) subscriptions[channel_key].delete deferrable subscriptions.delete(channel_key) if subscriptions[channel_key].empty? end
Private Instance Methods
subscriptions()
click to toggle source
# File lib/firehose/server/subscriber.rb, line 46 def subscriptions @subscriptions ||= Hash.new{|h,k| h[k] = []} end