class Deepstream::Client
Attributes
options[R]
state[R]
Public Class Methods
new(url, options = {})
click to toggle source
# File lib/deepstream/client.rb, line 26 def initialize(url, options = {}) @url = Helpers.url(url) @error_handler = ErrorHandler.new(self) @record_handler = RecordHandler.new(self) @event_handler = EventHandler.new(self) @options = Helpers.default_options.merge!(options) @message_buffer = [] @last_hearbeat = nil @challenge_denied, @@deliberate_close = false @state = CONNECTION_STATE::CLOSED @verbose = @options[:verbose] @reinitialize_master = @options[:reinitialize_master] @log = Async.logger @connected_before = false connect end
Public Instance Methods
close()
click to toggle source
# File lib/deepstream/client.rb, line 78 def close return unless connected? @deliberate_close = true log.info 'deliberate closing' if @verbose rescue => e on_exception(e) end
connected?()
click to toggle source
# File lib/deepstream/client.rb, line 93 def connected? @state != CONNECTION_STATE::CLOSED end
inspect()
click to toggle source
# File lib/deepstream/client.rb, line 105 def inspect "#{self.class} #{@url} | connection state: #{@state}" end
logged_in?()
click to toggle source
# File lib/deepstream/client.rb, line 101 def logged_in? @state == CONNECTION_STATE::OPEN end
login(credentials = @options[:credentials])
click to toggle source
# File lib/deepstream/client.rb, line 65 def login(credentials = @options[:credentials]) @options[:credentials] = credentials if @challenge_denied on_error("this client's connection was closed") elsif @state == CONNECTION_STATE::AUTHENTICATING send_message(TOPIC::AUTH, ACTION::REQUEST, @options[:credentials].to_json, priority: true) end self rescue => e on_exception(e) self end
on_message(data)
click to toggle source
# File lib/deepstream/client.rb, line 48 def on_message(data) message = Message.new(data) @log.info "Receiving msg = #{message.inspect}" if @verbose case message.topic when TOPIC::AUTH then authentication_message(message) when TOPIC::CONNECTION then connection_message(message) when TOPIC::EVENT then @event_handler.on_message(message) when TOPIC::ERROR then @error_handler.on_error(message) when TOPIC::RECORD then @record_handler.on_message(message) when TOPIC::RPC then raise(UnknownTopic, 'RPC is currently not implemented.') when nil then nil else raise(UnknownTopic, message) end rescue => e on_exception(e) end
on_open()
click to toggle source
# File lib/deepstream/client.rb, line 43 def on_open @log.info "Websocket connection opened" if @verbose @state = CONNECTION_STATE::AWAITING_CONNECTION end
reconnect()
click to toggle source
# File lib/deepstream/client.rb, line 86 def reconnect return if connected? @deliberate_close = false @state = CONNECTION_STATE::RECONNECTING @log.info 'Reconnecting' if @verbose end
reconnecting?()
click to toggle source
# File lib/deepstream/client.rb, line 97 def reconnecting? @state == CONNECTION_STATE::RECONNECTING end
send_message(*args, **kwargs)
click to toggle source
# File lib/deepstream/client.rb, line 109 def send_message(*args, **kwargs) message = Message.parse(*args) priority = kwargs[:priority] || false timeout = message.topic == TOPIC::EVENT ? kwargs[:timeout] : nil message.set_timeout(timeout) if timeout return unable_to_send_message(message, priority) if !logged_in? && message.needs_authentication? priority ? @message_buffer.unshift(message) : @message_buffer.push(message) rescue Errno::EPIPE unable_to_send_message(message, priority) rescue => e on_exception(e) end
Private Instance Methods
_connect(url = @url)
click to toggle source
# File lib/deepstream/client.rb, line 221 def _connect(url = @url) @log.info "Trying to connect to #{url}" if @verbose endpoint = Async::HTTP::Endpoint.parse(url) Async::WebSocket::Client.connect(endpoint) do |connection| on_open @task.async do loop do break if ( connection.closed? || @deliberate_close ) while !@message_buffer.empty? && (logged_in? || !@message_buffer[0].needs_authentication?) msg = @message_buffer.shift next if msg.expired? encoded_msg = msg.to_s.encode(Encoding::UTF_8) @log.info "Sending msg = #{msg.inspect}" if @verbose connection.write(encoded_msg) connection.flush rescue @message_buffer.unshift(msg) end @task.sleep 0.001 end end loop do on_message(connection.read) break if ( connection.closed? || @deliberate_close ) end rescue => e @log.error "Connection error #{e.message}" on_exception(e) ensure connection.close end on_close end
authentication_message(message)
click to toggle source
# File lib/deepstream/client.rb, line 144 def authentication_message(message) case message.action when ACTION::ACK then on_login when ACTION::ERROR then on_error(message) else raise(UnknownAction, message) end end
check_heartbeat()
click to toggle source
# File lib/deepstream/client.rb, line 188 def check_heartbeat return unless @last_heartbeat && Time.now - @last_heartbeat > 2 * @options[:heartbeat_interval] @state = CONNECTION_STATE::CLOSED on_error('Two connections heartbeats missed successively') end
connect(in_thread = @options[:in_thread])
click to toggle source
# File lib/deepstream/client.rb, line 199 def connect(in_thread = @options[:in_thread]) if in_thread Thread.start { connection_loop } else connection_loop end end
connection_loop()
click to toggle source
# File lib/deepstream/client.rb, line 207 def connection_loop Async do |task| @task = task loop do if @deliberate_close sleep 5 next end _connect sleep 5 end end end
connection_message(message)
click to toggle source
# File lib/deepstream/client.rb, line 132 def connection_message(message) case message.action when ACTION::ACK then on_connection_ack when ACTION::CHALLENGE then on_challenge when ACTION::ERROR then on_error(message) when ACTION::PING then on_ping when ACTION::REDIRECT then on_redirection(message) when ACTION::REJECTION then on_rejection else raise(UnknownAction, message) end end
on_challenge()
click to toggle source
# File lib/deepstream/client.rb, line 152 def on_challenge @state = CONNECTION_STATE::CHALLENGING send_message(TOPIC::CONNECTION, ACTION::CHALLENGE_RESPONSE, @url) end
on_close()
click to toggle source
# File lib/deepstream/client.rb, line 181 def on_close @log.info 'Websocket connection closed' if @verbose @state = CONNECTION_STATE::CLOSED rescue => e on_exception(e) end
on_connection_ack()
click to toggle source
# File lib/deepstream/client.rb, line 157 def on_connection_ack @state = CONNECTION_STATE::AUTHENTICATING @message_buffer.delete_if { |msg| msg.action == ACTION::PATCH } @record_handler.reinitialize if @connected_before login end
on_login()
click to toggle source
# File lib/deepstream/client.rb, line 169 def on_login @connected_before = true @state = CONNECTION_STATE::OPEN every(@options[:heartbeat_interval]) { check_heartbeat } if @options[:heartbeat_interval] resubscribe end
on_ping()
click to toggle source
# File lib/deepstream/client.rb, line 164 def on_ping @last_heartbeat = Time.now send_message(TOPIC::CONNECTION, ACTION::PONG) end
on_redirection(message)
click to toggle source
# File lib/deepstream/client.rb, line 194 def on_redirection(message) on_close @url = message.data.last end
on_rejection()
click to toggle source
# File lib/deepstream/client.rb, line 176 def on_rejection @challenge_denied = true on_close end
unable_to_send_message(message, priority)
click to toggle source
# File lib/deepstream/client.rb, line 124 def unable_to_send_message(message, priority) @state = CONNECTION_STATE::CLOSED if logged_in? unless message.expired? @log.info("Placing a message #{message.inspect} in the buffer, waiting for authentication") if @verbose priority ? @message_buffer.unshift(message) : @message_buffer.push(message) end end