class Ably::Realtime::Presence::MembersMap

A class encapsulating a map of the members of this presence channel, indexed by the unique {Ably::Models::PresenceMessage#member_key}

This map synchronises the membership of the presence set by handling SYNC messages from the service. Since sync messages can be out-of-order - e.g. a PRESENT sync event being received after that member has in fact left - this map keeps “witness” entries, with ABSENT Action, to remember the fact that a LEAVE event has been seen for a member. These entries are cleared once the last set of updates of a sync sequence have been received.

@api private

Constants

STATE

Attributes

sync_session_id[R]

Public Class Methods

new(presence) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 31
def initialize(presence)
  @presence = presence

  @state = STATE(:initialized)

  # Two sets of members maintained
  # @members contains all members present on the channel
  # @local_members contains only this connection's members for the purpose of re-entering the member if channel continuity is lost
  reset_members
  reset_local_members

  @absent_member_cleanup_queue = []

  # Each SYNC session has a unique ID so that following SYNC
  # any members present in the map without this session ID are
  # not present according to Ably, see #RTP19
  @sync_session_id = -1

  setup_event_handlers
end

Public Instance Methods

count()
Alias for: length
each(&block) click to toggle source

Method to allow {MembersMap} to be {ruby-doc.org/core-2.1.3/Enumerable.html Enumerable} @note this method will not wait for the sync operation to complete so may return an incomplete set of members. Use {MembersMap#get} instead.

# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 140
def each(&block)
  return to_enum(:each) unless block_given?
  present_members.each(&block)
end
enter_local_members() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 155
def enter_local_members
  local_members.values.each do |member|
    logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{member.client_id} with data: #{member.data}" }
    presence.enter_client_with_id(member.id, member.client_id, member.data).tap do |deferrable|
      deferrable.errback do |error|
        re_enter_error = Ably::Models::ErrorInfo.new(
          message: "unable to automatically re-enter presence channel for client_id '#{member.client_id}'. Source error code #{error.code} and message '#{error.message}'",
          code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL
        )
        channel.emit :update, Ably::Models::ChannelStateChange.new(
          current: channel.state,
          previous: channel.state,
          event: Ably::Realtime::Channel::EVENT(:update),
          reason: re_enter_error,
          resumed: true
        )
      end
    end
  end
end
get(options = {}, &block) click to toggle source

Get the list of presence members

@param [Hash,String] options an options Hash to filter members @option options [String] :client_id optional client_id filter for the member @option options [String] :connection_id optional connection_id filter for the member @option options [String] :wait_for_sync defaults to true, if true the get method waits for the initial presence sync following channel attachment to complete before returning the members present, else it immediately returns the members present currently

@yield [Array<Ably::Models::PresenceMessage>] array of present members

@return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks

# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 83
def get(options = {}, &block)
  wait_for_sync = options.fetch(:wait_for_sync, true)
  deferrable    = Ably::Util::SafeDeferrable.new(logger)

  result_block = lambda do
    present_members.tap do |members|
      members.keep_if { |member| member.connection_id == options[:connection_id] } if options[:connection_id]
      members.keep_if { |member| member.client_id == options[:client_id] } if options[:client_id]
    end.tap do |members|
      safe_yield block, members if block_given?
      deferrable.succeed members
    end
  end

  if !wait_for_sync || sync_complete?
    result_block.call
  else
    # Must be defined before subsequent procs reference this callback
    reset_callbacks = nil

    sync_complete_callback = lambda do
      reset_callbacks.call if reset_callbacks
      result_block.call
    end

    sync_failed_callback = lambda do |error|
      reset_callbacks.call if reset_callbacks
      deferrable.fail error
    end

    reset_callbacks = lambda do
      off(&sync_complete_callback)
      off(&sync_failed_callback)
      channel.off(&sync_failed_callback)
    end

    unsafe_once(:sync_complete, &sync_complete_callback)
    unsafe_once(:failed, &sync_failed_callback)

    channel.unsafe_once(:detaching, :detached, :failed) do |error_reason|
      sync_failed_callback.call error_reason
    end
  end

  deferrable
end
length() click to toggle source

@!attribute [r] length @return [Integer] number of present members known at this point in time, will not wait for sync operation to complete

# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 132
def length
  present_members.length
end
Also aliased as: count, size
local_members() click to toggle source

A copy of the local members present i.e. members entered from this connection and thus the responsibility of this library to re-enter on the channel automatically if the channel loses continuity

@return [Hash<String, PresenceMessage>] @api private

# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 151
def local_members
  @local_members
end
size()
Alias for: length
sync_serial_cursor_at_end?() click to toggle source

When channel serial in ProtocolMessage SYNC is nil or an empty cursor appears after the ‘:’ such as ‘cf30e75054887:psl_7g:client:189’. That is an indication that there are no more SYNC messages.

@api private

# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 68
def sync_serial_cursor_at_end?
  sync_serial.nil? || sync_serial.to_s.match(/^[\w-]+:?$/)
end
update_sync_serial(serial) click to toggle source

Update the SYNC serial from the ProtocolMessage so that SYNC can be resumed. If the serial is nil, or the part after the first : is empty, then the SYNC is complete

@return [void]

@api private

# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 58
def update_sync_serial(serial)
  @sync_serial = serial
end

Private Instance Methods

absent_member_cleanup_queue() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 191
def absent_member_cleanup_queue
  @absent_member_cleanup_queue
end
absent_members() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 368
def absent_members
  members.reject do |key, presence|
    presence.fetch(:present)
  end.map do |key, presence|
    presence.fetch(:message)
  end
end
add_presence_member(presence_message) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 317
def add_presence_member(presence_message)
  logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' for event '#{presence_message.action}' #{members.has_key?(presence_message.member_key) ? 'updated' : 'added'}.\n#{presence_message.to_json}" }
  # Mutate the PresenceMessage so that the action is :present, see #RTP2d
  present_presence_message = presence_message.shallow_clone(action: Ably::Models::PresenceMessage::ACTION.Present)
  member_set_upsert present_presence_message, true
  presence.emit_message presence_message.action, presence_message
end
channel() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 203
def channel
  presence.channel
end
clean_up_absent_members() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 376
def clean_up_absent_members
  while member_to_remove = absent_member_cleanup_queue.shift
    logger.debug { "#{self.class.name}: Cleaning up absent member '#{member_to_remove.member_key}' after SYNC.\n#{member_to_remove.to_json}" }
    member_set_delete member_to_remove
  end
end
clean_up_members_not_present_after_sync() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 383
def clean_up_members_not_present_after_sync
  members.select do |member_key, member|
    member.fetch(:sync_session_id) != sync_session_id
  end.each do |member_key, member|
    presence_message = member.fetch(:message).shallow_clone(action: Ably::Models::PresenceMessage::ACTION.Leave, id: nil)
    logger.debug { "#{self.class.name}: Fabricating a LEAVE event for member '#{presence_message.member_key}' was not present in recently completed SYNC session ID '#{sync_session_id}'.\n#{presence_message.to_json}" }
    member_set_delete member.fetch(:message)
    presence.emit_message Ably::Models::PresenceMessage::ACTION.Leave, presence_message
  end
end
client() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 207
def client
  channel.client
end
connection() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 215
def connection
  client.connection
end
ensure_presence_message_is_valid(presence_message) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 269
def ensure_presence_message_is_valid(presence_message)
  return true if presence_message.connection_id

  error = Ably::Exceptions::ProtocolError.new("Protocol error, presence message is missing connectionId", 400, Ably::Exceptions::Codes::PROTOCOL_ERROR)
  logger.error { "PresenceMap: On channel '#{channel.name}' error: #{error}" }
end
logger() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 211
def logger
  client.logger
end
member_set_delete(presence_message) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 352
def member_set_delete(presence_message)
  members.delete presence_message.member_key
  if sync_complete? and presence_message.connection_id == connection.id
    local_members.delete presence_message.client_id
    logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' deleted" }
  end
end
member_set_upsert(presence_message, present) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 344
def member_set_upsert(presence_message, present)
  members[presence_message.member_key] = { present: present, message: presence_message, sync_session_id: sync_session_id }
  if presence_message.connection_id == connection.id
    local_members[presence_message.client_id] = presence_message
    logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' added" }
  end
end
members() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 179
def members
  @members
end
presence() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 187
def presence
  @presence
end
present_members() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 360
def present_members
  members.select do |key, presence|
    presence.fetch(:present)
  end.map do |key, presence|
    presence.fetch(:message)
  end
end
remove_presence_member(presence_message) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 325
def remove_presence_member(presence_message)
  logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' removed.\n#{presence_message.to_json}" }

  if sync_complete?
    member_set_delete presence_message
  else
    member_set_upsert presence_message, false
    absent_member_cleanup_queue << presence_message
  end

  presence.emit_message presence_message.action, presence_message
end
reset_local_members() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 199
def reset_local_members
  @local_members = Hash.new
end
reset_members() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 195
def reset_members
  @members = Hash.new
end
setup_event_handlers() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 219
def setup_event_handlers
  presence.__incoming_msgbus__.subscribe(:presence, :sync) do |presence_message|
    presence_message.decode(client.encoders, channel.options) do |encode_error, error_message|
      client.logger.error error_message
    end
    update_members_and_emit_events presence_message
  end

  # RTP5a
  channel.unsafe_on(:failed, :detached) do
    reset_members
    reset_local_members
  end

  unsafe_on(:sync_starting) do
    @sync_session_id += 1
  end

  unsafe_on(:sync_none) do
    @sync_session_id += 1
    # Immediately change to finalizing which will result in all members being cleaned up
    change_state :finalizing_sync
  end

  unsafe_on(:finalizing_sync) do
    clean_up_absent_members
    clean_up_members_not_present_after_sync
    change_state :sync_complete
  end
end
should_update_member?(new_message) click to toggle source

If the message received is older than the last known event for presence then skip (return false). This can occur during a SYNC operation. For example:

- SYNC starts
- LEAVE event received for clientId 5
- SYNC present even received for clientId 5 with a timestamp before LEAVE event because the LEAVE occured before the SYNC operation completed

@return [Boolean] true when new_message is newer than the existing member in the PresenceMap

# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 284
def should_update_member?(new_message)
  if members[new_message.member_key]
    existing_message = members[new_message.member_key].fetch(:message)

    # If both are messages published by clients (not fabricated), use the ID to determine newness, see #RTP2b2
    if new_message.id.start_with?(new_message.connection_id) && existing_message.id.start_with?(existing_message.connection_id)
      new_message_parts = new_message.id.match(/(\d+):(\d+)$/)
      existing_message_parts = existing_message.id.match(/(\d+):(\d+)$/)

      if !new_message_parts || !existing_message_parts
        logger.fatal { "#{self.class.name}: Message IDs for new message #{new_message.id} or old message #{existing_message.id} are invalid. \nNew message: #{new_message.to_json}" }
        return existing_message.timestamp < new_message.timestamp
      end

      # ID is in the format "connid:msgSerial:index" such as "aaaaaa:0:0"
      # if msgSerial is greater then the new_message should update the member
      # if msgSerial is equal and index is greater, then update the member
      if new_message_parts[1].to_i > existing_message_parts[1].to_i # msgSerial
        true
      elsif new_message_parts[1].to_i == existing_message_parts[1].to_i # msgSerial equal
        new_message_parts[2].to_i > existing_message_parts[2].to_i # compare index
      else
        false
      end
    else
      # This message is fabricated or could not be validated so rely on timestamps, see #RTP2b1
      new_message.timestamp > existing_message.timestamp
    end
  else
    true
  end
end
sync_serial() click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 183
def sync_serial
  @sync_serial
end
touch_presence_member(presence_message) click to toggle source

No update is necessary for this member as older / no change during update however we need to update the sync_session_id so that this member is not removed following SYNC

# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 340
def touch_presence_member(presence_message)
  members.fetch(presence_message.member_key)[:sync_session_id] = sync_session_id
end
update_members_and_emit_events(presence_message) click to toggle source
# File lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb, line 250
def update_members_and_emit_events(presence_message)
  return unless ensure_presence_message_is_valid(presence_message)

  unless should_update_member?(presence_message)
    logger.debug { "#{self.class.name}: Skipped presence member #{presence_message.action} on channel #{presence.channel.name}.\n#{presence_message.to_json}" }
    touch_presence_member presence_message
    return
  end

  case presence_message.action
  when Ably::Models::PresenceMessage::ACTION.Enter, Ably::Models::PresenceMessage::ACTION.Update, Ably::Models::PresenceMessage::ACTION.Present
    add_presence_member presence_message
  when Ably::Models::PresenceMessage::ACTION.Leave
    remove_presence_member presence_message
  else
    Ably::Exceptions::ProtocolError.new("Protocol error, unknown presence action #{presence_message.action}", 400, Ably::Exceptions::Codes::PROTOCOL_ERROR)
  end
end