module Cargosocket::Stream
Constants
- WS_OPT_KEYS
Public Class Methods
call(cargoenv)
click to toggle source
# File lib/cargosocket/stream.rb, line 7 def self.call(cargoenv) raise "requires stream adapter" unless cargoenv[:adapter] EM.run do ws_config = WS_OPT_KEYS.reduce({}) do |acc, key| acc[key] = cargoenv[key] if cargoenv.has_key?(key) next acc end EM::WebSocket.run(ws_config){ |ws| config_socket(cargoenv, ws) } end end
config_socket(cargoenv, ws)
click to toggle source
# File lib/cargosocket/stream.rb, line 19 def self.config_socket(cargoenv, ws) ws.onopen do |w| path = w.path.sub(/^\//, '') periodic_ping(cargoenv, ws) EM.add_timer(0.5){ setup_connection(cargoenv, ws, path, w.query) } end end
periodic_ping(cargoenv, ws)
click to toggle source
# File lib/cargosocket/stream.rb, line 27 def self.periodic_ping(cargoenv, ws) EM.add_periodic_timer(cargoenv[:ping_timer] || 5){ ws.ping } end
setup_connection(cargoenv, ws, path, query)
click to toggle source
# File lib/cargosocket/stream.rb, line 31 def self.setup_connection(cargoenv, ws, path, query) cb_adapter = StreamAdapters::CargobullAdapter channels = cb_adapter.channels(cargoenv, path, query) ref = cb_adapter.reference(cargoenv, path, query) if channels && ref state = cargoenv[:adapter].subscribe(*channels) do |channel, message| cb_adapter.pop(cargoenv, path, ref, channel, message, &ws.method(:send)) end state.keys.each do |channel| cb_adapter.subscribe(cargoenv, path, ref, channel) do |v| cargoenv[:adapter].push(channel, v) end end ws.onmessage do |message| state.keys.each do |channel| cb_adapter.push(cargoenv, path, ref, channel, message.strip){ |v| cargoenv[:adapter].push(channel, v) } end end ws.onerror do |error| cargoenv[:adapter].unsubscribe(state) do |channel| cb_adapter.error(cargoenv, path, ref, channel) do |v| cargoenv[:adapter].push(channel, v) end end end ws.onclose do cargoenv[:adapter].unsubscribe(state) do |channel| cb_adapter.unsubscribe(cargoenv, path, ref, channel) do |v| cargoenv[:adapter].push(channel, v) end end end else ws.close(3001, channels.inspect || ref.to_s || "unknown") end end