class Pakyow::Realtime::Server
Constants
- HEARTBEAT_INTERVAL
Attributes
adapter[R]
Public Class Methods
new(adapter = :memory, adapter_config, timeout_config)
click to toggle source
# File lib/pakyow/realtime/server.rb, line 18 def initialize(adapter = :memory, adapter_config, timeout_config) require "pakyow/realtime/server/adapters/#{adapter}" @adapter = Adapters.const_get(adapter.to_s.capitalize).new(self, adapter_config) @sockets = Concurrent::Array.new @timeout_config = timeout_config @executor = Concurrent::SingleThreadExecutor.new( auto_terminate: false ) connect rescue LoadError => e Pakyow.logger.error "Failed to load data subscriber store adapter named `#{adapter}'" Pakyow.logger.error e.message end
Public Instance Methods
connect()
click to toggle source
# File lib/pakyow/realtime/server.rb, line 40 def connect @executor << -> { start_heartbeat; @adapter.connect } end
disconnect()
click to toggle source
# File lib/pakyow/realtime/server.rb, line 46 def disconnect @executor << -> { stop_heartbeat; @adapter.disconnect } end
find_socket(id_or_socket) { |socket| ... }
click to toggle source
# File lib/pakyow/realtime/server.rb, line 113 def find_socket(id_or_socket) socket = if id_or_socket.is_a?(WebSocket) id_or_socket else find_socket_by_id(id_or_socket) end yield socket if socket end
find_socket_by_id(socket_id)
click to toggle source
# File lib/pakyow/realtime/server.rb, line 109 def find_socket_by_id(socket_id) @sockets.find { |socket| socket.id == socket_id } end
find_socket_id(id_or_socket) { |socket_id| ... }
click to toggle source
# File lib/pakyow/realtime/server.rb, line 123 def find_socket_id(id_or_socket) socket_id = if id_or_socket.is_a?(WebSocket) id_or_socket.id else id_or_socket end yield socket_id if socket_id end
shutdown()
click to toggle source
# File lib/pakyow/realtime/server.rb, line 33 def shutdown disconnect @sockets.each(&:shutdown) @executor.shutdown @executor.wait_for_termination(30) end
socket_connect(id_or_socket)
click to toggle source
# File lib/pakyow/realtime/server.rb, line 52 def socket_connect(id_or_socket) @executor << -> { find_socket(id_or_socket) do |socket| @sockets << socket @adapter.persist(socket.id) @adapter.current!(socket.id, socket.object_id) end } end
socket_disconnect(id_or_socket)
click to toggle source
# File lib/pakyow/realtime/server.rb, line 62 def socket_disconnect(id_or_socket) @executor << -> { find_socket(id_or_socket) do |socket| @sockets.delete(socket) # If this isn't the current instance for the socket id, it means that a # reconnect probably happened and the new socket connected before we # knew that the old one disconnected. Since there's a newer socket, # don't trigger leave events or expirations for the old one. # if @adapter.current?(socket.id, socket.object_id) socket.leave @adapter.expire(socket.id, @timeout_config.disconnect) end end } end
socket_subscribe(id_or_socket, *channels)
click to toggle source
# File lib/pakyow/realtime/server.rb, line 80 def socket_subscribe(id_or_socket, *channels) @executor << -> { find_socket_id(id_or_socket) do |socket_id| @adapter.socket_subscribe(socket_id, *channels) @adapter.expire(socket_id, @timeout_config.initial) end } end
socket_unsubscribe(*channels)
click to toggle source
# File lib/pakyow/realtime/server.rb, line 89 def socket_unsubscribe(*channels) @executor << -> { @adapter.socket_unsubscribe(*channels) } end
subscription_broadcast(channel, message)
click to toggle source
# File lib/pakyow/realtime/server.rb, line 95 def subscription_broadcast(channel, message) @executor << -> { @adapter.subscription_broadcast(channel.to_s, channel: channel.name, message: message) } end
transmit_message_to_connection_ids(message, socket_ids, raw: false)
click to toggle source
Called by the adapter, which guarantees that this server has connections for these ids.
# File lib/pakyow/realtime/server.rb, line 103 def transmit_message_to_connection_ids(message, socket_ids, raw: false) socket_ids.each do |socket_id| find_socket_by_id(socket_id)&.transmit(message, raw: raw) end end
Private Instance Methods
start_heartbeat()
click to toggle source
# File lib/pakyow/realtime/server.rb, line 135 def start_heartbeat @heartbeat = Concurrent::TimerTask.new(execution_interval: HEARTBEAT_INTERVAL) do @executor << -> { @sockets.each(&:beat) } end @heartbeat.execute end
stop_heartbeat()
click to toggle source
# File lib/pakyow/realtime/server.rb, line 145 def stop_heartbeat @heartbeat.shutdown end