class Pakyow::Realtime::Server::Adapters::Redis

Manages websocket channels in redis.

Use this in production.

@api private

Constants

INFINITY
KEY_PART_SEPARATOR
KEY_PREFIX
PUBSUB_PREFIX

Public Class Methods

new(server, config) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 24
def initialize(server, config)
  @server, @config = server, config
  @prefix = [@config[:key_prefix], KEY_PREFIX].join(KEY_PART_SEPARATOR)

  connect
  cleanup
end

Public Instance Methods

connect() click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 32
def connect
  @redis = ConnectionPool.new(**@config[:pool]) {
    ::Redis.new(@config[:connection])
  }

  @buffer = Buffer.new(@redis, pubsub_channel)
  @subscriber = Subscriber.new(::Redis.new(@config[:connection]), pubsub_channel) do |payload|
    channel, message = Marshal.restore(payload).values_at(:channel, :message)
    @server.transmit_message_to_connection_ids(message, socket_ids_for_channel(channel), raw: true)
  end
end
current!(socket_id, socket_instance_id) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 120
def current!(socket_id, socket_instance_id)
  @redis.with do |redis|
    redis.set(key_socket_instance_id_by_socket_id(socket_id), socket_instance_id)
  end
end
current?(socket_id, socket_instance_id) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 126
def current?(socket_id, socket_instance_id)
  @redis.with do |redis|
    redis.get(key_socket_instance_id_by_socket_id(socket_id)) == socket_instance_id.to_s
  end
end
disconnect() click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 44
def disconnect
  @subscriber.disconnect
end
expire(socket_id, seconds) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 89
def expire(socket_id, seconds)
  time_expire = Time.now.to_i + seconds
  channels = channels_for_socket_id(socket_id)

  @redis.with do |redis|
    redis.multi do |transaction|
      channels.each do |channel|
        transaction.zadd(key_socket_ids_by_channel(channel), time_expire, socket_id)
      end

      transaction.expireat(key_channels_by_socket_id(socket_id), time_expire + 1)
      transaction.expireat(key_socket_instance_id_by_socket_id(socket_id), time_expire + 1)
    end
  end
end
persist(socket_id) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 105
def persist(socket_id)
  channels = channels_for_socket_id(socket_id)

  @redis.with do |redis|
    redis.multi do |transaction|
      channels.each do |channel|
        transaction.zadd(key_socket_ids_by_channel(channel), INFINITY, socket_id)
      end

      transaction.persist(key_channels_by_socket_id(socket_id))
      transaction.persist(key_socket_instance_id_by_socket_id(socket_id))
    end
  end
end
socket_subscribe(socket_id, *channels) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 48
def socket_subscribe(socket_id, *channels)
  @redis.with do |redis|
    redis.multi do |transaction|
      channels.each do |channel|
        channel = channel.to_s
        transaction.zadd(key_socket_ids_by_channel(channel), INFINITY, socket_id)
        transaction.zadd(key_channels_by_socket_id(socket_id), INFINITY, channel)
      end
    end
  end
end
socket_unsubscribe(*channels) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 60
def socket_unsubscribe(*channels)
  @redis.with do |redis|
    channels.each do |channel|
      channel = channel.to_s

      # Channel could contain a wildcard, so this takes some work...
      redis.scan_each(match: key_socket_ids_by_channel(channel)) do |key|
        channel = key.split("channel:", 2)[1]

        socket_ids = redis.zrangebyscore(
          key, Time.now.to_i, INFINITY
        )

        redis.multi do |transaction|
          transaction.del(key)

          socket_ids.each do |socket_id|
            transaction.zrem(key_channels_by_socket_id(socket_id), channel)
          end
        end
      end
    end
  end
end
subscription_broadcast(channel, message) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 85
def subscription_broadcast(channel, message)
  @buffer << Marshal.dump(channel: channel, message: { payload: message }.to_json)
end

Protected Instance Methods

build_key(*parts) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 150
def build_key(*parts)
  [@prefix].concat(parts).join(KEY_PART_SEPARATOR)
end
channels_for_socket_id(socket_id) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 142
def channels_for_socket_id(socket_id)
  @redis.with do |redis|
    redis.zrangebyscore(
      key_channels_by_socket_id(socket_id), INFINITY, INFINITY
    )
  end
end
cleanup() click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 170
def cleanup
  Concurrent::TimerTask.new(execution_interval: 300, timeout_interval: 300) {
    Pakyow.logger.internal {
      "[Pakyow::Realtime::Server::Adapters::Redis] Cleaning up channel keys"
    }

    removed_count = 0
    @redis.with do |redis|
      redis.scan_each(match: key_socket_ids_by_channel("*")) do |key|
        socket_ids = redis.zrangebyscore(
          key, Time.now.to_i, INFINITY
        )

        if socket_ids.empty?
          removed_count += 1
          redis.del(key)
        end
      end
    end

    Pakyow.logger.internal {
      "[Pakyow::Realtime::Server::Adapters::Redis] Removed #{removed_count} keys"
    }
  }.execute
end
key_channels_by_socket_id(socket_id) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 158
def key_channels_by_socket_id(socket_id)
  build_key("socket_id:#{socket_id}")
end
key_socket_ids_by_channel(channel) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 154
def key_socket_ids_by_channel(channel)
  build_key("channel:#{channel}")
end
key_socket_instance_id_by_socket_id(socket_id) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 162
def key_socket_instance_id_by_socket_id(socket_id)
  build_key("socket_instance_id:#{socket_id}")
end
pubsub_channel() click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 166
def pubsub_channel
  [@prefix, PUBSUB_PREFIX].join(KEY_PART_SEPARATOR)
end
socket_ids_for_channel(channel) click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 134
def socket_ids_for_channel(channel)
  @redis.with do |redis|
    redis.zrangebyscore(
      key_socket_ids_by_channel(channel), INFINITY, INFINITY
    )
  end
end