class Pakyow::Realtime::Server::Adapters::Memory

Manages websocket channels in memory.

Great for development, not for use in production!

@api private

Constants

SERIALIZABLE_IVARS

Public Class Methods

new(server, _config) click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 17
def initialize(server, _config)
  @server = server

  @socket_ids_by_channel = Concurrent::Hash.new
  @channels_by_socket_id = Concurrent::Hash.new
  @expirations_for_socket_id = Concurrent::Hash.new
  @socket_instances_by_socket_id = Concurrent::Hash.new
end

Public Instance Methods

connect() click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 26
def connect
  # intentionally empty
end
current!(socket_id, socket_instance_id) click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 84
def current!(socket_id, socket_instance_id)
  @socket_instances_by_socket_id[socket_id] = socket_instance_id
end
current?(socket_id, socket_instance_id) click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 88
def current?(socket_id, socket_instance_id)
  @socket_instances_by_socket_id[socket_id] == socket_instance_id
end
disconnect() click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 30
def disconnect
  # intentionally empty
end
expire(socket_id, seconds) click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 62
def expire(socket_id, seconds)
  task = Concurrent::ScheduledTask.execute(seconds) {
    channels_for_socket_id(socket_id).each do |channel|
      @channels_by_socket_id.delete(socket_id)
      @socket_ids_by_channel[channel].delete(socket_id)
      @socket_instances_by_socket_id.delete(socket_id)
    end
  }

  @expirations_for_socket_id[socket_id] ||= []
  @expirations_for_socket_id[socket_id] << task
end
expiring?(socket_id) click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 80
def expiring?(socket_id)
  @expirations_for_socket_id[socket_id]&.any?
end
persist(socket_id) click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 75
def persist(socket_id)
  (@expirations_for_socket_id[socket_id] || []).each(&:cancel)
  @expirations_for_socket_id.delete(socket_id)
end
serialize() click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 98
def serialize
  SERIALIZABLE_IVARS.each_with_object({}) do |ivar, hash|
    hash[ivar] = instance_variable_get(ivar)
  end
end
socket_subscribe(socket_id, *channels) click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 34
def socket_subscribe(socket_id, *channels)
  channels.each do |channel|
    channel = channel.to_s.to_sym
    (@socket_ids_by_channel[channel] ||= Concurrent::Array.new) << socket_id
    (@channels_by_socket_id[socket_id] ||= Concurrent::Array.new) << channel
  end
end
socket_unsubscribe(*channels) click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 42
def socket_unsubscribe(*channels)
  channels.each do |channel|
    channel = Regexp.new(channel.to_s)

    @socket_ids_by_channel.select { |key|
      key.to_s.match?(channel)
    }.each do |key, socket_ids|
      @socket_ids_by_channel.delete(key)

      socket_ids.each do |socket_id|
        @channels_by_socket_id[socket_id]&.delete(key)
      end
    end
  end
end
subscription_broadcast(channel, message) click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 58
def subscription_broadcast(channel, message)
  @server.transmit_message_to_connection_ids(message, socket_ids_for_channel(channel))
end

Protected Instance Methods

channels_for_socket_id(socket_id) click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 120
def channels_for_socket_id(socket_id)
  @channels_by_socket_id[socket_id] || []
end
socket_ids_for_channel(channel) click to toggle source
# File lib/pakyow/realtime/server/adapters/memory.rb, line 106
def socket_ids_for_channel(channel)
  channel = Regexp.new(channel.to_s)

  @socket_ids_by_channel.select { |key|
    key.to_s.match?(channel)
  }.each_with_object([]) do |(_, socket_ids_for_channel), socket_ids|
    socket_ids.concat(
      socket_ids_for_channel.reject { |socket_id|
        expiring?(socket_id)
      }
    )
  end
end