class Pakyow::Realtime::Server

Constants

HEARTBEAT_INTERVAL

Attributes

adapter[R]

Public Class Methods

new(adapter = :memory, adapter_config, timeout_config) click to toggle source
# File lib/pakyow/realtime/server.rb, line 18
def initialize(adapter = :memory, adapter_config, timeout_config)
  require "pakyow/realtime/server/adapters/#{adapter}"
  @adapter = Adapters.const_get(adapter.to_s.capitalize).new(self, adapter_config)
  @sockets = Concurrent::Array.new
  @timeout_config = timeout_config
  @executor = Concurrent::SingleThreadExecutor.new(
    auto_terminate: false
  )

  connect
rescue LoadError => e
  Pakyow.logger.error "Failed to load data subscriber store adapter named `#{adapter}'"
  Pakyow.logger.error e.message
end

Public Instance Methods

connect() click to toggle source
# File lib/pakyow/realtime/server.rb, line 40
def connect
  @executor << -> {
    start_heartbeat; @adapter.connect
  }
end
disconnect() click to toggle source
# File lib/pakyow/realtime/server.rb, line 46
def disconnect
  @executor << -> {
    stop_heartbeat; @adapter.disconnect
  }
end
find_socket(id_or_socket) { |socket| ... } click to toggle source
# File lib/pakyow/realtime/server.rb, line 113
def find_socket(id_or_socket)
  socket = if id_or_socket.is_a?(WebSocket)
    id_or_socket
  else
    find_socket_by_id(id_or_socket)
  end

  yield socket if socket
end
find_socket_by_id(socket_id) click to toggle source
# File lib/pakyow/realtime/server.rb, line 109
def find_socket_by_id(socket_id)
  @sockets.find { |socket| socket.id == socket_id }
end
find_socket_id(id_or_socket) { |socket_id| ... } click to toggle source
# File lib/pakyow/realtime/server.rb, line 123
def find_socket_id(id_or_socket)
  socket_id = if id_or_socket.is_a?(WebSocket)
    id_or_socket.id
  else
    id_or_socket
  end

  yield socket_id if socket_id
end
shutdown() click to toggle source
# File lib/pakyow/realtime/server.rb, line 33
def shutdown
  disconnect
  @sockets.each(&:shutdown)
  @executor.shutdown
  @executor.wait_for_termination(30)
end
socket_connect(id_or_socket) click to toggle source
# File lib/pakyow/realtime/server.rb, line 52
def socket_connect(id_or_socket)
  @executor << -> {
    find_socket(id_or_socket) do |socket|
      @sockets << socket
      @adapter.persist(socket.id)
      @adapter.current!(socket.id, socket.object_id)
    end
  }
end
socket_disconnect(id_or_socket) click to toggle source
# File lib/pakyow/realtime/server.rb, line 62
def socket_disconnect(id_or_socket)
  @executor << -> {
    find_socket(id_or_socket) do |socket|
      @sockets.delete(socket)

      # If this isn't the current instance for the socket id, it means that a
      # reconnect probably happened and the new socket connected before we
      # knew that the old one disconnected. Since there's a newer socket,
      # don't trigger leave events or expirations for the old one.
      #
      if @adapter.current?(socket.id, socket.object_id)
        socket.leave
        @adapter.expire(socket.id, @timeout_config.disconnect)
      end
    end
  }
end
socket_subscribe(id_or_socket, *channels) click to toggle source
# File lib/pakyow/realtime/server.rb, line 80
def socket_subscribe(id_or_socket, *channels)
  @executor << -> {
    find_socket_id(id_or_socket) do |socket_id|
      @adapter.socket_subscribe(socket_id, *channels)
      @adapter.expire(socket_id, @timeout_config.initial)
    end
  }
end
socket_unsubscribe(*channels) click to toggle source
# File lib/pakyow/realtime/server.rb, line 89
def socket_unsubscribe(*channels)
  @executor << -> {
    @adapter.socket_unsubscribe(*channels)
  }
end
subscription_broadcast(channel, message) click to toggle source
# File lib/pakyow/realtime/server.rb, line 95
def subscription_broadcast(channel, message)
  @executor << -> {
    @adapter.subscription_broadcast(channel.to_s, channel: channel.name, message: message)
  }
end
transmit_message_to_connection_ids(message, socket_ids, raw: false) click to toggle source

Called by the adapter, which guarantees that this server has connections for these ids.

# File lib/pakyow/realtime/server.rb, line 103
def transmit_message_to_connection_ids(message, socket_ids, raw: false)
  socket_ids.each do |socket_id|
    find_socket_by_id(socket_id)&.transmit(message, raw: raw)
  end
end

Private Instance Methods

start_heartbeat() click to toggle source
# File lib/pakyow/realtime/server.rb, line 135
def start_heartbeat
  @heartbeat = Concurrent::TimerTask.new(execution_interval: HEARTBEAT_INTERVAL) do
    @executor << -> {
      @sockets.each(&:beat)
    }
  end

  @heartbeat.execute
end
stop_heartbeat() click to toggle source
# File lib/pakyow/realtime/server.rb, line 145
def stop_heartbeat
  @heartbeat.shutdown
end