class Firehose::Rack::Consumer::WebSocket::MultiplexingHandler

Public Class Methods

new(ws) click to toggle source
# File lib/firehose/rack/consumer/web_socket.rb, line 117
def initialize(ws)
  super(ws)
  @subscriptions = {}
  subscribe_multiplexed Consumer.multiplex_subscriptions(@req)
end

Public Instance Methods

close(event) click to toggle source
# File lib/firehose/rack/consumer/web_socket.rb, line 145
def close(event)
  @subscriptions.each_value(&:close)
  @subscriptions.clear
end
message(event) click to toggle source
# File lib/firehose/rack/consumer/web_socket.rb, line 123
def message(event)
  msg = parse_message(event)

  if subscriptions = msg[:multiplex_subscribe]
    subscriptions = [subscriptions] unless subscriptions.is_a?(Array)
    return subscribe_multiplexed(subscriptions)
  end

  if channel_names = msg[:multiplex_unsubscribe]
    return unsubscribe(channel_names)
  end

  if msg[:ping] == 'PING'
    Firehose.logger.debug "WS ping received, sending pong"
    return send_message pong: "PONG"
  end
end
open(event) click to toggle source
# File lib/firehose/rack/consumer/web_socket.rb, line 141
def open(event)
  Firehose.logger.debug "Multiplexing Websocket connected: #{@req.path}"
end
subscribe(channel_name, last_sequence) click to toggle source

Subscribe the client to the channel on the server. Asks for the last sequence for clients that reconnect.

# File lib/firehose/rack/consumer/web_socket.rb, line 163
def subscribe(channel_name, last_sequence)
  channel      = Server::Channel.new channel_name
  deferrable   = channel.next_messages last_sequence
  subscription = Subscription.new(channel, deferrable)

  @subscriptions[channel_name] = subscription

  deferrable.callback do |messages|
    messages.each do |message|
      send_message(
        channel: channel_name,
        message: message.payload,
        last_sequence: message.sequence
      )
      Firehose.logger.debug "WS sent `#{message.payload}` to `#{channel_name}` with sequence `#{message.sequence}`"
    end
    subscribe channel_name, messages.last.sequence
  end

  deferrable.errback do |e|
    EM.next_tick { raise e.inspect } unless e == :disconnect
  end
end
subscribe_multiplexed(subscriptions) click to toggle source
# File lib/firehose/rack/consumer/web_socket.rb, line 150
def subscribe_multiplexed(subscriptions)
  subscriptions.each do |sub|
    Firehose.logger.debug "Subscribing multiplexed to: #{sub}"

    channel, sequence = sub[:channel], sub[:message_sequence]
    next if channel.nil?

    subscribe(channel, sequence.to_i)
  end
end
unsubscribe(channel_names) click to toggle source
# File lib/firehose/rack/consumer/web_socket.rb, line 187
def unsubscribe(channel_names)
  Firehose.logger.debug "Unsubscribing from channels: #{channel_names}"
  Array(channel_names).each do |chan|
    if sub = @subscriptions[chan]
      sub.close
      @subscriptions.delete(chan)
    end
  end
end