class Discordrb::Gateway

Client for the Discord gateway protocol

Constants

GATEWAY_VERSION

The version of the gateway that's supposed to be used.

LARGE_THRESHOLD

How many members there need to be in a server for it to count as “large”

ZLIB_SUFFIX

Attributes

check_heartbeat_acks[RW]

Heartbeat ACKs are Discord's way of verifying on the client side whether the connection is still alive. If this is set to true (default value) the gateway client will use that functionality to detect zombie connections and reconnect in such a case; however it may lead to instability if there's some problem with the ACKs. If this occurs it can simply be set to false. @return [true, false] whether or not this gateway should check for heartbeat ACKs.

Public Class Methods

new(bot, token, shard_key = nil, compress_mode = :stream) click to toggle source
# File lib/discordrb/gateway.rb, line 146
def initialize(bot, token, shard_key = nil, compress_mode = :stream)
  @token = token
  @bot = bot

  @shard_key = shard_key

  @getc_mutex = Mutex.new

  # Whether the connection to the gateway has succeeded yet
  @ws_success = false

  @check_heartbeat_acks = true

  @compress_mode = compress_mode
end

Public Instance Methods

heartbeat() click to toggle source

Sends a heartbeat with the last received packet's seq (to acknowledge that we have received it and all packets before it), or if none have been received yet, with 0. @see send_heartbeat

# File lib/discordrb/gateway.rb, line 239
def heartbeat
  if check_heartbeat_acks
    unless @last_heartbeat_acked
      # We're in a bad situation - apparently the last heartbeat wasn't acked, which means the connection is likely
      # a zombie. Reconnect
      LOGGER.warn('Last heartbeat was not acked, so this is a zombie connection! Reconnecting')

      # We can't send anything on zombie connections
      @pipe_broken = true
      reconnect
      return
    end

    @last_heartbeat_acked = false
  end

  send_heartbeat(@session ? @session.sequence : 0)
end
identify() click to toggle source

Identifies to Discord with the default parameters. @see send_identify

# File lib/discordrb/gateway.rb, line 267
def identify
  compress = @compress_mode == :large
  send_identify(@token, {
                  '$os': RUBY_PLATFORM,
                  '$browser': 'discordrb',
                  '$device': 'discordrb',
                  '$referrer': '',
                  '$referring_domain': ''
                }, compress, 100, @shard_key)
end
inject_error(e) click to toggle source

Injects a terminal gateway error into the handler. Useful for testing the reconnect logic. @param e [Exception] The exception object to inject.

# File lib/discordrb/gateway.rb, line 232
def inject_error(e)
  handle_internal_close(e)
end
inject_reconnect(url = nil) click to toggle source

Injects a reconnect event (op 7) into the event processor, causing Discord to reconnect to the given gateway URL. If the URL is set to nil, it will reconnect and get an entirely new gateway URL. This method has not much use outside of testing and implementing highly custom reconnect logic. @param url [String, nil] the URL to connect to or nil if one should be obtained from Discord.

# File lib/discordrb/gateway.rb, line 212
def inject_reconnect(url = nil)
  # When no URL is specified, the data should be nil, as is the case with Discord-sent packets.
  data = url ? { url: url } : nil

  handle_message({
    op: Opcodes::RECONNECT,
    d: data
  }.to_json)
end
inject_resume(seq) click to toggle source

Injects a resume packet (op 6) into the gateway. If this is done with a running connection, it will cause an error. It has no use outside of testing stuff that I know of, but if you want to use it anyway for some reason, here it is. @param seq [Integer, nil] The sequence ID to inject, or nil if the currently tracked one should be used.

# File lib/discordrb/gateway.rb, line 226
def inject_resume(seq)
  send_resume(raw_token, @session_id, seq || @sequence)
end
kill() click to toggle source

Kills the websocket thread, stopping all connections to Discord.

# File lib/discordrb/gateway.rb, line 198
def kill
  @ws_thread.kill
end
notify_ready() click to toggle source

Notifies the {#run_async} method that everything is ready and the caller can now continue (i.e. with syncing, or with doing processing and then syncing)

# File lib/discordrb/gateway.rb, line 204
def notify_ready
  @ws_success = true
end
open?() click to toggle source

Whether the WebSocket connection to the gateway is currently open

# File lib/discordrb/gateway.rb, line 181
def open?
  @handshake&.finished? && !@closed
end
reconnect(attempt_resume = true) click to toggle source

Reconnects the gateway connection in a controlled manner. @param attempt_resume [true, false] Whether a resume should be attempted after the reconnection.

# File lib/discordrb/gateway.rb, line 355
def reconnect(attempt_resume = true)
  @session.suspend if @session && attempt_resume

  @instant_reconnect = true
  @should_reconnect = true

  close
end
resume() click to toggle source

Resumes the session from the last recorded point. @see send_resume

# File lib/discordrb/gateway.rb, line 349
def resume
  send_resume(@token, @session.session_id, @session.sequence)
end
run_async() click to toggle source

Connect to the gateway server in a separate thread

# File lib/discordrb/gateway.rb, line 163
def run_async
  @ws_thread = Thread.new do
    Thread.current[:discordrb_name] = 'websocket'
    connect_loop
    LOGGER.warn('The WS loop exited! Not sure if this is a good thing')
  end

  LOGGER.debug('WS thread created! Now waiting for confirmation that everything worked')
  sleep(0.5) until @ws_success
  LOGGER.debug('Confirmation received! Exiting run.')
end
send_heartbeat(sequence) click to toggle source

Sends a heartbeat packet (op 1). This tells Discord that the current connection is still active and that the last packets until the given sequence have been processed (in case of a resume). @param sequence [Integer] The sequence number for which to send a heartbeat.

# File lib/discordrb/gateway.rb, line 261
def send_heartbeat(sequence)
  send_packet(Opcodes::HEARTBEAT, sequence)
end
send_identify(token, properties, compress, large_threshold, shard_key = nil) click to toggle source

Sends an identify packet (op 2). This starts a new session on the current connection and tells Discord who we are. This can only be done once a connection. @param token [String] The token with which to authorise the session. If it belongs to a bot account, it must be

prefixed with "Bot ".

@param properties [Hash<Symbol => String>] A list of properties for Discord to use in analytics. The following

keys are recognised:

 - "$os" (recommended value: the operating system the bot is running on)
 - "$browser" (recommended value: library name)
 - "$device" (recommended value: library name)
 - "$referrer" (recommended value: empty)
 - "$referring_domain" (recommended value: empty)

@param compress [true, false] Whether certain large packets should be compressed using zlib. @param large_threshold [Integer] The member threshold after which a server counts as large and will have to have

its member list chunked.

@param shard_key [Array(Integer, Integer), nil] The shard key to use for sharding, represented as

[shard_id, num_shards], or nil if the bot should not be sharded.
# File lib/discordrb/gateway.rb, line 296
def send_identify(token, properties, compress, large_threshold, shard_key = nil)
  data = {
    # Don't send a v anymore as it's entirely determined by the URL now
    token: token,
    properties: properties,
    compress: compress,
    large_threshold: large_threshold
  }

  # Don't include the shard key at all if it is nil as Discord checks for its mere existence
  data[:shard] = shard_key if shard_key

  send_packet(Opcodes::IDENTIFY, data)
end
send_packet(opcode, packet) click to toggle source

Sends a custom packet over the connection. This can be useful to implement future yet unimplemented functionality or for testing. You probably shouldn't use this unless you know what you're doing. @param opcode [Integer] The opcode the packet should be sent as. Can be one of {Opcodes} or a custom value if

necessary.

@param packet [Object] Some arbitrary JSON-serialisable data that should be sent as the `d` field.

# File lib/discordrb/gateway.rb, line 406
def send_packet(opcode, packet)
  data = {
    op: opcode,
    d: packet
  }

  send(data.to_json)
end
send_raw(data, type = :text) click to toggle source

Sends custom raw data over the connection. Only useful for testing; even if you know what you're doing you probably want to use {#send_packet} instead. @param data [String] The data to send. @param type [Symbol] The type the WebSocket frame should have; either `:text`, `:binary`, `:ping`, `:pong`, or

`:close`.
# File lib/discordrb/gateway.rb, line 420
def send_raw(data, type = :text)
  send(data, type)
end
send_request_members(server_id, query, limit) click to toggle source

Sends a request members packet (op 8). This will order Discord to gradually sent all requested members as dispatch events with type `GUILD_MEMBERS_CHUNK`. It is necessary to use this method in order to get all members of a large server (see `large_threshold` in {#send_identify}), however it can also be used for other purposes. @param server_id [Integer] The ID of the server whose members to query. @param query [String] If this string is not empty, only members whose username starts with this string will be

returned.

@param limit [Integer] How many members to send at maximum, or `0` to send all members.

# File lib/discordrb/gateway.rb, line 391
def send_request_members(server_id, query, limit)
  data = {
    guild_id: server_id,
    query: query,
    limit: limit
  }

  send_packet(Opcodes::REQUEST_MEMBERS, data)
end
send_resume(token, session_id, seq) click to toggle source

Sends a resume packet (op 6). This replays all events from a previous point specified by its packet sequence. This will not work if the packet to resume from has already been acknowledged using a heartbeat, or if the session ID belongs to a now invalid session.

If this packet is sent at the beginning of a connection, it will act similarly to an {#identify} in that it creates a session on the current connection. Unlike identify however, this packet can also be sent in an existing session and will just replay some of the events. @param token [String] The token that was used to identify the session to resume. @param session_id [String] The session ID of the session to resume. @param seq [Integer] The packet sequence of the packet after which the events should be replayed.

# File lib/discordrb/gateway.rb, line 374
def send_resume(token, session_id, seq)
  data = {
    token: token,
    session_id: session_id,
    seq: seq
  }

  send_packet(Opcodes::RESUME, data)
end
send_status_update(status, since, game, afk) click to toggle source

Sends a status update packet (op 3). This sets the bot user's status (online/idle/…) and game playing/streaming. @param status [String] The status that should be set (`online`, `idle`, `dnd`, `invisible`). @param since [Integer] The unix timestamp in milliseconds when the status was set. Should only be provided when

`afk` is true.

@param game [Hash<Symbol => Object>, nil] `nil` if no game should be played, or a hash of `:game => “name”` if a

game should be played. The hash can also contain additional attributes for streaming statuses.

@param afk [true, false] Whether the status was set due to inactivity on the user's part.

# File lib/discordrb/gateway.rb, line 318
def send_status_update(status, since, game, afk)
  data = {
    status: status,
    since: since,
    game: game,
    afk: afk
  }

  send_packet(Opcodes::PRESENCE, data)
end
send_voice_state_update(server_id, channel_id, self_mute, self_deaf) click to toggle source

Sends a voice state update packet (op 4). This packet can connect a user to a voice channel, update self mute/deaf status in an existing voice connection, move the user to a new voice channel on the same server or disconnect an existing voice connection. @param server_id [Integer] The ID of the server on which this action should occur. @param channel_id [Integer, nil] The channel ID to connect/move to, or `nil` to disconnect. @param self_mute [true, false] Whether the user should itself be muted to everyone else. @param self_deaf [true, false] Whether the user should be deaf towards other users.

# File lib/discordrb/gateway.rb, line 336
def send_voice_state_update(server_id, channel_id, self_mute, self_deaf)
  data = {
    guild_id: server_id,
    channel_id: channel_id,
    self_mute: self_mute,
    self_deaf: self_deaf
  }

  send_packet(Opcodes::VOICE_STATE, data)
end
stop(no_sync = false) click to toggle source

Stops the bot gracefully, disconnecting the websocket without immediately killing the thread. This means that Discord is immediately aware of the closed connection and makes the bot appear offline instantly.

If this method doesn't work or you're looking for something more drastic, use {#kill} instead.

# File lib/discordrb/gateway.rb, line 189
def stop(no_sync = false)
  @should_reconnect = false
  close(no_sync)

  # Return nil so command bots don't send a message
  nil
end
sync() click to toggle source

Prevents all further execution until the websocket thread stops (e.g. through a closed connection).

# File lib/discordrb/gateway.rb, line 176
def sync
  @ws_thread.join
end

Private Instance Methods

close(no_sync = false) click to toggle source
# File lib/discordrb/gateway.rb, line 810
def close(no_sync = false)
  # If we're already closed, there's no need to do anything - return
  return if @closed

  # Suspend the session so we don't send heartbeats
  @session&.suspend

  # Send a close frame (if we can)
  send nil, :close unless @pipe_broken

  # We're officially closed, notify the main loop.
  # This needs to be synchronised with the getc mutex, so the notification, and especially the actual
  # close afterwards, don't coincide with the main loop reading something from the SSL socket.
  # This would cause a segfault due to (I suspect) Ruby bug #12292: https://bugs.ruby-lang.org/issues/12292
  if no_sync
    @closed = true
  else
    @getc_mutex.synchronize { @closed = true }
  end

  # Close the socket if possible
  @socket&.close
  @socket = nil

  # Make sure we do necessary things as soon as we're closed
  handle_close(nil)
end
connect() click to toggle source
# File lib/discordrb/gateway.rb, line 543
def connect
  LOGGER.debug('Connecting')

  # Get the URI we should connect to
  url = process_gateway
  LOGGER.debug("Gateway URL: #{url}")

  # Parse it
  gateway_uri = URI.parse(url)

  # Zlib context for this gateway connection
  @zlib_reader = Zlib::Inflate.new

  # Connect to the obtained URI with a socket
  @socket = obtain_socket(gateway_uri)
  LOGGER.debug('Obtained socket')

  # Initialise some properties
  @handshake = ::WebSocket::Handshake::Client.new(url: url) # Represents the handshake between us and the server
  @handshaked = false # Whether the handshake has finished yet
  @pipe_broken = false # Whether we've received an EPIPE at any time
  @closed = false # Whether the websocket is currently closed

  # We're done! Delegate to the websocket loop
  websocket_loop
rescue StandardError => e
  LOGGER.error('An error occurred while connecting to the websocket!')
  LOGGER.log_exception(e)
end
connect_loop() click to toggle source
# File lib/discordrb/gateway.rb, line 455
def connect_loop
  # Initialize falloff so we wait for more time before reconnecting each time
  @falloff = 1.0

  @should_reconnect = true
  loop do
    connect

    break unless @should_reconnect

    if @instant_reconnect
      LOGGER.info('Instant reconnection flag was set - reconnecting right away')
      @instant_reconnect = false
    else
      wait_for_reconnect
    end

    # Restart the loop, i.e. reconnect
  end
end
find_gateway() click to toggle source
# File lib/discordrb/gateway.rb, line 523
def find_gateway
  response = API.gateway(@token)
  JSON.parse(response)['url']
end
handle_close(e) click to toggle source
# File lib/discordrb/gateway.rb, line 774
def handle_close(e)
  if e.respond_to? :code
    # It is a proper close frame we're dealing with, print reason and message to console
    LOGGER.error('Websocket close frame received!')
    LOGGER.error("Code: #{e.code}")
    LOGGER.error("Message: #{e.data}")
  elsif e.is_a? Exception
    # Log the exception
    LOGGER.error('The websocket connection has closed due to an error!')
    LOGGER.log_exception(e)
  else
    LOGGER.error("The websocket connection has closed: #{e.inspect}")
  end
end
handle_dispatch(packet) click to toggle source

Op 0

# File lib/discordrb/gateway.rb, line 694
def handle_dispatch(packet)
  data = packet['d']
  type = packet['t'].intern

  case type
  when :READY
    LOGGER.info("Discord using gateway protocol version: #{data['v']}, requested: #{GATEWAY_VERSION}")

    @session = Session.new(data['session_id'])
    @session.sequence = 0
  when :RESUMED
    # The RESUMED event is received after a successful op 6 (resume). It does nothing except tell the bot the
    # connection is initiated (like READY would). Starting with v5, it doesn't set a new heartbeat interval anymore
    # since that is handled by op 10 (HELLO).
    LOGGER.info 'Resumed'
    return
  end

  @bot.dispatch(type, data)
end
handle_error(e) click to toggle source
# File lib/discordrb/gateway.rb, line 638
def handle_error(e)
  LOGGER.error('An error occurred in the main websocket loop!')
  LOGGER.log_exception(e)
end
handle_handshake_data(recv_data) click to toggle source
# File lib/discordrb/gateway.rb, line 628
def handle_handshake_data(recv_data)
  @handshake << recv_data
  return unless @handshake.finished?

  @handshaked = true
  handle_open
end
handle_heartbeat(packet) click to toggle source

Op 1

# File lib/discordrb/gateway.rb, line 716
def handle_heartbeat(packet)
  # If we receive a heartbeat, we have to resend one with the same sequence
  send_heartbeat(packet['s'])
end
handle_heartbeat_ack(packet) click to toggle source

Op 11

# File lib/discordrb/gateway.rb, line 763
def handle_heartbeat_ack(packet)
  LOGGER.debug("Received heartbeat ack for packet: #{packet.inspect}")
  @last_heartbeat_acked = true if @check_heartbeat_acks
end
handle_hello(packet) click to toggle source

Op 10

# File lib/discordrb/gateway.rb, line 741
def handle_hello(packet)
  LOGGER.debug('Hello!')

  # The heartbeat interval is given in ms, so divide it by 1000 to get seconds
  interval = packet['d']['heartbeat_interval'].to_f / 1000.0
  setup_heartbeats(interval)

  LOGGER.debug("Trace: #{packet['d']['_trace']}")
  LOGGER.debug("Session: #{@session.inspect}")

  if @session&.should_resume?
    # Make sure we're sending heartbeats again
    @session.resume

    # Send the actual resume packet to get the missing events
    resume
  else
    identify
  end
end
handle_internal_close(e) click to toggle source

Called when the websocket has been disconnected in some way - say due to a pipe error while sending

# File lib/discordrb/gateway.rb, line 769
def handle_internal_close(e)
  close
  handle_close(e)
end
handle_invalidate_session() click to toggle source

Op 9

# File lib/discordrb/gateway.rb, line 728
def handle_invalidate_session
  LOGGER.debug('Received op 9, invalidating session and re-identifying.')

  if @session
    @session.invalidate
  else
    LOGGER.warn('Received op 9 without a running session! Not invalidating, we *should* be fine though.')
  end

  identify
end
handle_message(msg) click to toggle source
# File lib/discordrb/gateway.rb, line 645
def handle_message(msg)
  if @compress_mode == :large
    if msg.byteslice(0) == 'x'
      # The message is compressed, inflate it
      msg = Zlib::Inflate.inflate(msg)
    end
  elsif @compress_mode == :stream
    # Write deflated string to buffer
    @zlib_reader << msg

    # Check if message ends in `ZLIB_SUFFIX`
    return if msg.bytesize < 4 || msg.byteslice(-4, 4) != ZLIB_SUFFIX

    # Inflate the deflated buffer
    msg = @zlib_reader.inflate('')
  end

  # Parse packet
  packet = JSON.parse(msg)
  op = packet['op'].to_i

  LOGGER.in(packet)

  # If the packet has a sequence defined (all dispatch packets have one), make sure to update that in the
  # session so it will be acknowledged next heartbeat.
  # Only do this, of course, if a session has been created already; for a READY dispatch (which has s=0 set but is
  # the packet that starts the session in the first place) we need not do any handling since initialising the
  # session will set it to 0 by default.
  @session.sequence = packet['s'] if packet['s'] && @session

  case op
  when Opcodes::DISPATCH
    handle_dispatch(packet)
  when Opcodes::HELLO
    handle_hello(packet)
  when Opcodes::RECONNECT
    handle_reconnect
  when Opcodes::INVALIDATE_SESSION
    handle_invalidate_session
  when Opcodes::HEARTBEAT_ACK
    handle_heartbeat_ack(packet)
  when Opcodes::HEARTBEAT
    handle_heartbeat(packet)
  else
    LOGGER.warn("Received invalid opcode #{op} - please report with this information: #{msg}")
  end
end
handle_open() click to toggle source
# File lib/discordrb/gateway.rb, line 636
def handle_open; end
handle_reconnect() click to toggle source

Op 7

# File lib/discordrb/gateway.rb, line 722
def handle_reconnect
  LOGGER.debug('Received op 7, reconnecting and attempting resume')
  reconnect
end
obtain_socket(uri) click to toggle source

Create and connect a socket using a URI

# File lib/discordrb/gateway.rb, line 489
def obtain_socket(uri)
  socket = TCPSocket.new(uri.host, uri.port || socket_port(uri))

  if secure_uri?(uri)
    ctx = OpenSSL::SSL::SSLContext.new

    if ENV['DISCORDRB_SSL_VERIFY_NONE']
      ctx.ssl_version = 'SSLv23'
      ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE # use VERIFY_PEER for verification

      cert_store = OpenSSL::X509::Store.new
      cert_store.set_default_paths
      ctx.cert_store = cert_store
    else
      ctx.set_params ssl_version: :TLSv1_2
    end

    socket = OpenSSL::SSL::SSLSocket.new(socket, ctx)
    socket.connect
  end

  socket
end
process_gateway() click to toggle source
# File lib/discordrb/gateway.rb, line 528
def process_gateway
  raw_url = find_gateway

  # Append a slash in case it's not there (I'm not sure how well WSCS handles it otherwise)
  raw_url += '/' unless raw_url.end_with? '/'

  query = if @compress_mode == :stream
            "?encoding=json&v=#{GATEWAY_VERSION}&compress=zlib-stream"
          else
            "?encoding=json&v=#{GATEWAY_VERSION}"
          end

  raw_url + query
end
secure_uri?(uri) click to toggle source

Whether the URI is secure (connection should be encrypted)

# File lib/discordrb/gateway.rb, line 514
def secure_uri?(uri)
  %w[https wss].include? uri.scheme
end
send(data, type = :text) click to toggle source
# File lib/discordrb/gateway.rb, line 789
def send(data, type = :text)
  LOGGER.out(data)

  unless @handshaked && !@closed
    # If we're not handshaked or closed, it means there's no connection to send anything to
    raise 'Tried to send something to the websocket while not being connected!'
  end

  # Create the frame we're going to send
  frame = ::WebSocket::Frame::Outgoing::Client.new(data: data, type: type, version: @handshake.version)

  # Try to send it
  begin
    @socket.write frame.to_s
  rescue StandardError => e
    # There has been an error!
    @pipe_broken = true
    handle_internal_close(e)
  end
end
setup_heartbeats(interval) click to toggle source
# File lib/discordrb/gateway.rb, line 426
def setup_heartbeats(interval)
  # Make sure to reset ACK handling, so we don't keep reconnecting
  @last_heartbeat_acked = true

  # We don't want to have redundant heartbeat threads, so if one already exists, don't start a new one
  return if @heartbeat_thread

  @heartbeat_interval = interval
  @heartbeat_thread = Thread.new do
    Thread.current[:discordrb_name] = 'heartbeat'
    loop do
      begin
        # Send a heartbeat if heartbeats are active and either no session exists yet, or an existing session is
        # suspended (e.g. after op7)
        if (@session && !@session.suspended?) || !@session
          sleep @heartbeat_interval
          @bot.raise_heartbeat_event
          heartbeat
        else
          sleep 1
        end
      rescue StandardError => e
        LOGGER.error('An error occurred while heartbeating!')
        LOGGER.log_exception(e)
      end
    end
  end
end
socket_port(uri) click to toggle source

The port we should connect to, if the URI doesn't have one set.

# File lib/discordrb/gateway.rb, line 519
def socket_port(uri)
  secure_uri?(uri) ? 443 : 80
end
wait_for_reconnect() click to toggle source

Separate method to wait an ever-increasing amount of time before reconnecting after being disconnected in an unexpected way

# File lib/discordrb/gateway.rb, line 478
def wait_for_reconnect
  # We disconnected in an unexpected way! Wait before reconnecting so we don't spam Discord's servers.
  LOGGER.debug("Attempting to reconnect in #{@falloff} seconds.")
  sleep @falloff

  # Calculate new falloff
  @falloff *= 1.5
  @falloff = 115 + (rand * 10) if @falloff > 120 # Cap the falloff at 120 seconds and then add some random jitter
end
websocket_loop() click to toggle source
# File lib/discordrb/gateway.rb, line 573
def websocket_loop
  # Send the handshake data that we have so far
  @socket.write(@handshake.to_s)

  # Create a frame to handle received data
  frame = ::WebSocket::Frame::Incoming::Client.new

  until @closed
    begin
      unless @socket
        LOGGER.warn('Socket is nil in websocket_loop! Reconnecting')
        handle_internal_close('Socket is nil in websocket_loop')
      end

      recv_data = nil

      # Get some data from the socket, synchronised so the socket can't be closed during this
      # 24: remove locking
      @getc_mutex.synchronize { recv_data = @socket.getc }

      # Check if we actually got data
      unless recv_data
        # If we didn't, wait
        sleep 1
        next
      end

      # Check whether the handshake has finished yet
      if @handshaked
        # If it hasn't, add the received data to the current frame
        frame << recv_data

        # Try to parse a message from the frame
        msg = frame.next
        while msg
          # Check whether the message is a close frame, and if it is, handle accordingly
          if msg.respond_to?(:code) && msg.code
            handle_internal_close(msg)
            break
          end

          # If there is one, handle it and try again
          handle_message(msg.data)
          msg = frame.next
        end
      else
        # If the handshake hasn't finished, handle it
        handle_handshake_data(recv_data)
      end
    rescue StandardError => e
      handle_error(e)
    end
  end
end