class Firehose::Rack::Consumer::WebSocket::MultiplexingHandler
Public Class Methods
new(ws)
click to toggle source
Calls superclass method
Firehose::Rack::Consumer::WebSocket::Handler::new
# File lib/firehose/rack/consumer/web_socket.rb, line 117 def initialize(ws) super(ws) @subscriptions = {} subscribe_multiplexed Consumer.multiplex_subscriptions(@req) end
Public Instance Methods
close(event)
click to toggle source
# File lib/firehose/rack/consumer/web_socket.rb, line 145 def close(event) @subscriptions.each_value(&:close) @subscriptions.clear end
message(event)
click to toggle source
# File lib/firehose/rack/consumer/web_socket.rb, line 123 def message(event) msg = parse_message(event) if subscriptions = msg[:multiplex_subscribe] subscriptions = [subscriptions] unless subscriptions.is_a?(Array) return subscribe_multiplexed(subscriptions) end if channel_names = msg[:multiplex_unsubscribe] return unsubscribe(channel_names) end if msg[:ping] == 'PING' Firehose.logger.debug "WS ping received, sending pong" return send_message pong: "PONG" end end
open(event)
click to toggle source
# File lib/firehose/rack/consumer/web_socket.rb, line 141 def open(event) Firehose.logger.debug "Multiplexing Websocket connected: #{@req.path}" end
subscribe(channel_name, last_sequence)
click to toggle source
Subscribe the client to the channel on the server. Asks for the last sequence for clients that reconnect.
# File lib/firehose/rack/consumer/web_socket.rb, line 163 def subscribe(channel_name, last_sequence) channel = Server::Channel.new channel_name deferrable = channel.next_messages last_sequence subscription = Subscription.new(channel, deferrable) @subscriptions[channel_name] = subscription deferrable.callback do |messages| messages.each do |message| send_message( channel: channel_name, message: message.payload, last_sequence: message.sequence ) Firehose.logger.debug "WS sent `#{message.payload}` to `#{channel_name}` with sequence `#{message.sequence}`" end subscribe channel_name, messages.last.sequence end deferrable.errback do |e| EM.next_tick { raise e.inspect } unless e == :disconnect end end
subscribe_multiplexed(subscriptions)
click to toggle source
# File lib/firehose/rack/consumer/web_socket.rb, line 150 def subscribe_multiplexed(subscriptions) subscriptions.each do |sub| Firehose.logger.debug "Subscribing multiplexed to: #{sub}" channel, sequence = sub[:channel], sub[:message_sequence] next if channel.nil? subscribe(channel, sequence.to_i) end end
unsubscribe(channel_names)
click to toggle source
# File lib/firehose/rack/consumer/web_socket.rb, line 187 def unsubscribe(channel_names) Firehose.logger.debug "Unsubscribing from channels: #{channel_names}" Array(channel_names).each do |chan| if sub = @subscriptions[chan] sub.close @subscriptions.delete(chan) end end end