class Pakyow::Realtime::Server::Adapters::Redis
Manages websocket channels in redis.
Use this in production.
@api private
Constants
- INFINITY
- KEY_PART_SEPARATOR
- KEY_PREFIX
- PUBSUB_PREFIX
Public Class Methods
new(server, config)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 24 def initialize(server, config) @server, @config = server, config @prefix = [@config[:key_prefix], KEY_PREFIX].join(KEY_PART_SEPARATOR) connect cleanup end
Public Instance Methods
connect()
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 32 def connect @redis = ConnectionPool.new(**@config[:pool]) { ::Redis.new(@config[:connection]) } @buffer = Buffer.new(@redis, pubsub_channel) @subscriber = Subscriber.new(::Redis.new(@config[:connection]), pubsub_channel) do |payload| channel, message = Marshal.restore(payload).values_at(:channel, :message) @server.transmit_message_to_connection_ids(message, socket_ids_for_channel(channel), raw: true) end end
current!(socket_id, socket_instance_id)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 120 def current!(socket_id, socket_instance_id) @redis.with do |redis| redis.set(key_socket_instance_id_by_socket_id(socket_id), socket_instance_id) end end
current?(socket_id, socket_instance_id)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 126 def current?(socket_id, socket_instance_id) @redis.with do |redis| redis.get(key_socket_instance_id_by_socket_id(socket_id)) == socket_instance_id.to_s end end
disconnect()
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 44 def disconnect @subscriber.disconnect end
expire(socket_id, seconds)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 89 def expire(socket_id, seconds) time_expire = Time.now.to_i + seconds channels = channels_for_socket_id(socket_id) @redis.with do |redis| redis.multi do |transaction| channels.each do |channel| transaction.zadd(key_socket_ids_by_channel(channel), time_expire, socket_id) end transaction.expireat(key_channels_by_socket_id(socket_id), time_expire + 1) transaction.expireat(key_socket_instance_id_by_socket_id(socket_id), time_expire + 1) end end end
persist(socket_id)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 105 def persist(socket_id) channels = channels_for_socket_id(socket_id) @redis.with do |redis| redis.multi do |transaction| channels.each do |channel| transaction.zadd(key_socket_ids_by_channel(channel), INFINITY, socket_id) end transaction.persist(key_channels_by_socket_id(socket_id)) transaction.persist(key_socket_instance_id_by_socket_id(socket_id)) end end end
socket_subscribe(socket_id, *channels)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 48 def socket_subscribe(socket_id, *channels) @redis.with do |redis| redis.multi do |transaction| channels.each do |channel| channel = channel.to_s transaction.zadd(key_socket_ids_by_channel(channel), INFINITY, socket_id) transaction.zadd(key_channels_by_socket_id(socket_id), INFINITY, channel) end end end end
socket_unsubscribe(*channels)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 60 def socket_unsubscribe(*channels) @redis.with do |redis| channels.each do |channel| channel = channel.to_s # Channel could contain a wildcard, so this takes some work... redis.scan_each(match: key_socket_ids_by_channel(channel)) do |key| channel = key.split("channel:", 2)[1] socket_ids = redis.zrangebyscore( key, Time.now.to_i, INFINITY ) redis.multi do |transaction| transaction.del(key) socket_ids.each do |socket_id| transaction.zrem(key_channels_by_socket_id(socket_id), channel) end end end end end end
subscription_broadcast(channel, message)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 85 def subscription_broadcast(channel, message) @buffer << Marshal.dump(channel: channel, message: { payload: message }.to_json) end
Protected Instance Methods
build_key(*parts)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 150 def build_key(*parts) [@prefix].concat(parts).join(KEY_PART_SEPARATOR) end
channels_for_socket_id(socket_id)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 142 def channels_for_socket_id(socket_id) @redis.with do |redis| redis.zrangebyscore( key_channels_by_socket_id(socket_id), INFINITY, INFINITY ) end end
cleanup()
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 170 def cleanup Concurrent::TimerTask.new(execution_interval: 300, timeout_interval: 300) { Pakyow.logger.internal { "[Pakyow::Realtime::Server::Adapters::Redis] Cleaning up channel keys" } removed_count = 0 @redis.with do |redis| redis.scan_each(match: key_socket_ids_by_channel("*")) do |key| socket_ids = redis.zrangebyscore( key, Time.now.to_i, INFINITY ) if socket_ids.empty? removed_count += 1 redis.del(key) end end end Pakyow.logger.internal { "[Pakyow::Realtime::Server::Adapters::Redis] Removed #{removed_count} keys" } }.execute end
key_channels_by_socket_id(socket_id)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 158 def key_channels_by_socket_id(socket_id) build_key("socket_id:#{socket_id}") end
key_socket_ids_by_channel(channel)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 154 def key_socket_ids_by_channel(channel) build_key("channel:#{channel}") end
key_socket_instance_id_by_socket_id(socket_id)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 162 def key_socket_instance_id_by_socket_id(socket_id) build_key("socket_instance_id:#{socket_id}") end
pubsub_channel()
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 166 def pubsub_channel [@prefix, PUBSUB_PREFIX].join(KEY_PART_SEPARATOR) end
socket_ids_for_channel(channel)
click to toggle source
# File lib/pakyow/realtime/server/adapters/redis.rb, line 134 def socket_ids_for_channel(channel) @redis.with do |redis| redis.zrangebyscore( key_socket_ids_by_channel(channel), INFINITY, INFINITY ) end end