class Firehose::Rack::Consumer::WebSocket::DefaultHandler

Manages connection state for the web socket that’s connected by the Consumer::WebSocket class. Deals with message sequence, connection, failures, and subscription state.

Public Instance Methods

close(event) click to toggle source

Log a message that hte client has disconnected and reset the state for the class. Clean up the subscribers to the channels.

# File lib/firehose/rack/consumer/web_socket.rb, line 79
def close(event)
  if @deferrable
    @deferrable.fail :disconnect
    @channel.unsubscribe(@deferrable) if @channel
  end
  Firehose.logger.debug "WS connection `#{@req.path}` closing. Code: #{event.code.inspect}; Reason #{event.reason.inspect}"
end
message(event) click to toggle source

Manages messages sent from the connect client to the server. This is mostly used to handle heart-beats that are designed to prevent the WebSocket connection from timing out from inactivity.

# File lib/firehose/rack/consumer/web_socket.rb, line 60
def message(event)
  msg = parse_message(event)
  seq = msg[:message_sequence]
  if msg[:ping] == 'PING'
    Firehose.logger.debug "WS ping received, sending pong"
    send_message pong: "PONG"
  elsif !@subscribed && seq.kind_of?(Integer)
    Firehose.logger.debug "Subscribing at message_sequence #{seq}"
    subscribe seq
  end
end
open(event) click to toggle source

Log a message that the client has connected.

# File lib/firehose/rack/consumer/web_socket.rb, line 73
def open(event)
  Firehose.logger.debug "WebSocket subscribed to `#{@req.path}`. Waiting for message_sequence..."
end
subscribe(last_sequence) click to toggle source

Subscribe the client to the channel on the server. Asks for the last sequence for clients that reconnect.

# File lib/firehose/rack/consumer/web_socket.rb, line 89
def subscribe(last_sequence)
  @subscribed = true
  @channel    = Server::Channel.new @req.path
  @deferrable = @channel.next_messages last_sequence
  @deferrable.callback do |messages|
    messages.each do |message|
      Firehose.logger.debug "WS sent `#{message.payload}` to `#{@req.path}` with sequence `#{message.sequence}`"
      send_message message: message.payload, last_sequence: message.sequence
    end
    subscribe messages.last.sequence
  end
  @deferrable.errback do |e|
    unless e == :disconnect
      Firehose.logger.error "WS Error: #{e}"
      EM.next_tick { raise e.inspect }
    end
  end
end