class Slanger::PresenceChannel

Public Class Methods

new(attrs) click to toggle source
Calls superclass method Slanger::Channel::new
# File lib/slanger/presence_channel.rb, line 28
def initialize(attrs)
  super
  # Also subscribe the slanger daemon to a Redis channel used for events concerning subscriptions.
  Slanger::Redis.subscribe "slanger:connection_notification"
end

Public Instance Methods

dispatch(message, channel) click to toggle source

Send an event received from Redis to the EventMachine channel

# File lib/slanger/presence_channel.rb, line 18
def dispatch(message, channel)
  if channel =~ /\Aslanger:/
    # Messages received from the Redis channel slanger:*  carry info on
    # subscriptions. Update our subscribers accordingly.
    update_subscribers message
  else
    push Oj.dump(message, mode: :compat)
  end
end
ids() click to toggle source
# File lib/slanger/presence_channel.rb, line 59
def ids
  subscriptions.map { |_, v| v["user_id"] }
end
subscribe(msg, callback, &blk) click to toggle source
# File lib/slanger/presence_channel.rb, line 34
def subscribe(msg, callback, &blk)
  channel_data = Oj.strict_load msg["data"]["channel_data"]
  public_subscription_id = SecureRandom.uuid

  # Send event about the new subscription to the Redis slanger:connection_notification Channel.
  publisher = publish_connection_notification subscription_id: public_subscription_id, online: true,
                                              channel_data: channel_data, channel: channel_id

  # Associate the subscription data to the public id in Redis.
  roster_add public_subscription_id, channel_data

  # fuuuuuuuuuccccccck!
  publisher.callback do
    EM.next_tick do
      # The Subscription event has been sent to Redis successfully.
      # Call the provided callback.
      callback.call
      # Add the subscription to our table.
      internal_subscription_table[public_subscription_id] = channel.subscribe &blk
    end
  end

  public_subscription_id
end
subscribers() click to toggle source
# File lib/slanger/presence_channel.rb, line 63
def subscribers
  Hash[subscriptions.map { |_, v| [v["user_id"], v["user_info"]] }]
end
unsubscribe(public_subscription_id) click to toggle source
# File lib/slanger/presence_channel.rb, line 67
def unsubscribe(public_subscription_id)
  # Unsubscribe from EM::Channel
  channel.unsubscribe(internal_subscription_table.delete(public_subscription_id)) # if internal_subscription_table[public_subscription_id]
  # Remove subscription data from Redis
  roster_remove public_subscription_id
  # Notify all instances
  publish_connection_notification subscription_id: public_subscription_id, online: false, channel: channel_id
end

Private Instance Methods

get_roster() click to toggle source
# File lib/slanger/presence_channel.rb, line 78
def get_roster
  # Read subscription infos from Redis.
  Fiber.new do
    f = Fiber.current
    Slanger::Redis.hgetall(channel_id).
      callback { |res| f.resume res }
    Fiber.yield
  end.resume
end
internal_subscription_table() click to toggle source

This is used map public subscription ids to em channel subscription ids. em channel subscription ids are incremented integers, so they cannot be used as keys in distributed system because they will not be unique

# File lib/slanger/presence_channel.rb, line 114
def internal_subscription_table
  @internal_subscription_table ||= {}
end
payload(event_name, payload = {}) click to toggle source
# File lib/slanger/presence_channel.rb, line 136
def payload(event_name, payload = {})
  Oj.dump({ channel: channel_id, event: event_name, data: payload }, mode: :compat)
end
publish_connection_notification(payload, retry_count = 0) click to toggle source
# File lib/slanger/presence_channel.rb, line 98
def publish_connection_notification(payload, retry_count = 0)
  # Send a subscription notification to the global slanger:connection_notification
  # channel.
  Slanger::Redis.publish("slanger:connection_notification", Oj.dump(payload, mode: :compat)).
    tap { |r| r.errback { publish_connection_notification payload, retry_count.succ unless retry_count == 5 } }
end
roster_add(key, value) click to toggle source
# File lib/slanger/presence_channel.rb, line 88
def roster_add(key, value)
  # Add subscription info to Redis.
  Slanger::Redis.hset(channel_id, key, value)
end
roster_remove(key) click to toggle source
# File lib/slanger/presence_channel.rb, line 93
def roster_remove(key)
  # Remove subscription info from Redis.
  Slanger::Redis.hdel(channel_id, key)
end
subscriptions() click to toggle source

This is the state of the presence channel across the system. kept in sync with redis pubsub

# File lib/slanger/presence_channel.rb, line 107
def subscriptions
  @subscriptions ||= get_roster || {}
end
update_subscribers(message) click to toggle source
# File lib/slanger/presence_channel.rb, line 118
def update_subscribers(message)
  if message["online"]
    # Don't tell the channel subscriptions a new member has been added if the subscriber data
    # is already present in the subscriptions hash, i.e. multiple browser windows open.
    unless subscriptions.has_value? message["channel_data"]
      push payload("pusher_internal:member_added", message["channel_data"])
    end
    subscriptions[message["subscription_id"]] = message["channel_data"]
  else
    # Don't tell the channel subscriptions the member has been removed if the subscriber data
    # still remains in the subscriptions hash, i.e. multiple browser windows open.
    subscriber = subscriptions.delete message["subscription_id"]
    if subscriber && !subscriptions.has_value?(subscriber)
      push payload("pusher_internal:member_removed", { user_id: subscriber["user_id"] })
    end
  end
end