class CapistranoSentinel::WebsocketClient

Attributes

actor[RW]
auto_pong[RW]
on_error[RW]
on_message[RW]
on_ping[RW]
protocol_version[R]
read_buffer_size[RW]
read_thread[R]
reconnect[RW]
retry_time[RW]
socket[R]

Public Class Methods

new(opts) click to toggle source
host

Host of request. Required if no :url param was provided.

path

Path of request. Should start with '/'. Default: '/'

query

Query for request. Should be in format “aaa=bbb&ccc=ddd”

secure

Defines protocol to use. If true then wss://, otherwise ws://. This option will not change default port - it should be handled by programmer.

port

Port of request. Default: nil

opts

Additional options:

:reconnect - if true, it will try to reconnect
:retry_time - how often should retries happen when reconnecting [default = 1s]

Alternatively it can be called with a single hash where key names as symbols are the same as param names

# File lib/capistrano_sentinel/classes/websocket_client.rb, line 20
def initialize(opts)

  # Initializing with a single hash
  @options = opts.symbolize_keys

  @auto_pong   = @options.fetch(:auto_pong, nil) || CapistranoSentinel.config.auto_pong
  @read_buffer_size = @options.fetch(:read_buffer_size, nil) ||  CapistranoSentinel.config.read_buffer_size
  @reconnect = @options.fetch(:reconnect, nil) ||  CapistranoSentinel.config.reconnect
  @retry_time = @options.fetch(:retry_time, nil) ||  CapistranoSentinel.config.retry_time


  @secure  = @options.fetch(:secure, nil) || CapistranoSentinel.config.secure

  @host    = @options.fetch(:host, nil) || CapistranoSentinel.config.host
  @port    = @secure ? 443 : (@options.fetch(:port, nil) ||  CapistranoSentinel.config.port)
  @path    = @options.fetch(:path, nil) ||  CapistranoSentinel.config.path
  @query   = @options.fetch(:query, nil)

  @actor ||=  @options.fetch(:actor, nil)
  @channel ||= @options.fetch(:channel, nil)


  @closed      = false
  @opened      = false

  @on_open    = lambda {
    log_to_file("native websocket client  websocket connection opened")
    subscribe(@channel) if @channel.present?
  }

  @on_close   = lambda { |message|
    log_to_file("#{@actor.class} client received on_close  #{message.inspect}")
    if @actor.present? && @actor.respond_to?(:on_close)
      if @actor.respond_to?(:async)
        @actor.async.on_close(message)
      else
        @actor.on_close(message)
      end
    end
  }

  @on_ping    = lambda { |message|
    log_to_file("#{@actor.class} client received PING  #{message.inspect}")
    if @actor.present? && @actor.respond_to?(:on_ping)
      if @actor.respond_to?(:async)
        @actor.async.on_ping(message)
      else
        @actor.on_ping(message)
      end
    end
  }

  @on_error   = lambda { |error|
    log_to_file("#{@actor.class} received ERROR  #{error.inspect} #{error.backtrace}")
    if @actor.present? && @actor.respond_to?(:on_error)
      if @actor.respond_to?(:async)
        @actor.async.on_error(error)
      else
        @actor.on_error(error)
      end
    end
  }

  @on_message = lambda { |message|
    message = parse_json(message)
    log_to_file("#{@actor.class} websocket client received JSON  #{message}")
    if @actor.present? && @actor.respond_to?(:async)
      log_to_file("#{@actor.class} works async on message #{message.inspect}")
      @actor.async.on_message(message)
    else
      @actor.on_message(message)
    end
  }
  connect
end

Public Instance Methods

is_windows?() click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 96
def is_windows?
  RbConfig::CONFIG['host_os'] =~ /mswin|mingw|cygwin/
end
publish(channel, data) click to toggle source

publishes to a channel some data (can be anything)

@param [string] channel @param [#to_s] data

@return [void]

@api public

# File lib/capistrano_sentinel/classes/websocket_client.rb, line 120
def publish(channel, data)
  send_action('publish', channel, data)
end
subscribe(channel, data = {}) click to toggle source

subscribes to a channel . need to be used inside the connect block passed to the actor

@param [string] channel

@return [void]

@api public

# File lib/capistrano_sentinel/classes/websocket_client.rb, line 107
def subscribe(channel, data = {})
  log_to_file("#{self.class} tries to subscribe to channel  #{channel} with #{data.inspect}")
  send_action('subscribe', channel, data)
end
unsubscribe(channel) click to toggle source

unsubscribes current client from a channel

@param [string] channel

@return [void]

@api public

# File lib/capistrano_sentinel/classes/websocket_client.rb, line 131
def unsubscribe(channel)
  send_action('unsubscribe', channel)
end
unsubscribe_all() click to toggle source

unsubscribes all clients from all channels

@return [void]

@api public

# File lib/capistrano_sentinel/classes/websocket_client.rb, line 151
def unsubscribe_all
  send_action('unsubscribe_all')
end
unsubscribe_clients(channel) click to toggle source

unsubscribes all clients subscribed to a channel

@param [string] channel

@return [void]

@api public

# File lib/capistrano_sentinel/classes/websocket_client.rb, line 142
def unsubscribe_clients(channel)
  send_action('unsubscribe_clients', channel)
end

Protected Instance Methods

chat(message) click to toggle source

method used to send messages to the webserver checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser otherwise will construct a JSON object that will have the key action with the value 'message“ and the key message witth the parameter's value

@param [Hash] message

@return [void]

@api private

# File lib/capistrano_sentinel/classes/websocket_client.rb, line 173
def chat(message)
  final_message = nil
  if message.is_a?(Hash)
    final_message = message.to_json
  else
    final_message = JSON.dump(action: 'message', message: message)
  end
  log_to_file("#{self.class} sends JSON #{final_message}")
  send_data(final_message)
end
connect() click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 196
def connect
  tcp_socket = ::TCPSocket.new(@host, @port)
  tcp_socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true)
  tcp_socket.setsockopt(::Socket::SOL_TCP, ::Socket::TCP_KEEPIDLE, 50)
  tcp_socket.setsockopt(::Socket::SOL_TCP, ::Socket::TCP_KEEPINTVL, 50)
  tcp_socket.setsockopt(::Socket::SOL_TCP, ::Socket::TCP_KEEPCNT, 10)

  if @secure
    @socket = ::OpenSSL::SSL::SSLSocket.new(tcp_socket)
    @socket.connect
  else
    @socket = tcp_socket
  end
  perform_handshake
end
determine_message_type(message) click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 302
def determine_message_type(message)
  log_to_file("#{self.class} tries to dispatch message #{message.inspect}")
  case message.type
  when :binary, :text
    fire_on_message(message.data)
  when :ping
    send_data(message.data, :pong) if @auto_pong
    fire_on_ping(message)
  when :pong
    fire_on_error(CapistranoSentinel::WsProtocolError.new('Invalid type pong received'))
  when :close
    fire_on_close(message)
  else
    fire_on_error(CapistranoSentinel::BadMessageTypeError.new("An unknown message type was received #{message.inspect}"))
  end
end
do_send(data, type=:text) click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 319
def do_send(data, type=:text)
  frame = ::WebSocket::Frame::Outgoing::Client.new(:version => @protocol_version, :data => data, :type => type)
  begin
    @socket.write_nonblock frame
    @socket.flush
  rescue ::Errno::EPIPE => ce
    fire_on_error(ce)
    fire_on_close
  rescue => e
    fire_on_error(e)
  end
end
fire_on_close(message = nil) click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 352
def fire_on_close(message = nil)
  log_to_file("#{self.class} tries to fire_on_close with #{message.inspect} ")
  @active = false
  @closed = true
  @on_close.call(message) if @on_close
  @socket.close unless @socket.closed?

  reconnect if @reconnect
end
fire_on_error(error) click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 347
def fire_on_error(error)
  log_to_file("#{self.class} tries to on_error with #{error.inspect} ")
  @on_error.call(error) if @on_error
end
fire_on_message(message) click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 337
def fire_on_message(message)
  log_to_file("#{self.class} tries to fire_on_message #{message.inspect}")
  @on_message.call(message) if @on_message
end
fire_on_open() click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 342
def fire_on_open
  log_to_file("#{self.class} tries to on_open ")
  @on_open.call() if @on_open
end
fire_on_ping(message) click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 332
def fire_on_ping(message)
  log_to_file("#{self.class} tries to ping #{message.inspect}")
  @on_ping.call(message) if @on_ping
end
init_messaging() click to toggle source

Use one thread to perform blocking read on the socket

# File lib/capistrano_sentinel/classes/websocket_client.rb, line 276
def init_messaging
  @read_thread = Thread.new { read_loop }
end
perform_handshake() click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 230
def perform_handshake
  handshake = ::WebSocket::Handshake::Client.new({
    :host   => @host,
    :port   => @port,
    :secure => @secure,
    :path   => @path,
    :query  => @query
    })

    @socket.write handshake.to_s
    buf = ''

    loop do
      begin
        if handshake.finished?
          @protocol_version = handshake.version
          @active = true
          @opened = true
          log_to_file("#{self.class} got handshake finished ")
          init_messaging
          fire_on_open
          break
        else
          # do non blocking reads on headers - 1 byte at a time
          buf.concat(@socket.read_nonblock(1))
          # \r\n\r\n i.e. a blank line, separates headers from body
          if idx = buf.index(/\r\n\r\n/m)
            handshake << buf # parse headers

            if handshake.finished? && !handshake.valid?
              fire_on_error(CapistranoSentinel::ConnectError.new('Server responded with an invalid handshake'))
              fire_on_close #close if handshake is not valid
              break
            end
          end
        end
      rescue ::IO::WaitReadable => e
        #log_to_file("#{self.class} got WaitReadable #{e.inspect}")
      rescue ::IO::WaitWritable => e
        #log_to_file("#{self.class} got WaitWritable #{e.inspect}")
        # ignored
      end
    end
  end
read_loop() click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 280
def read_loop
  frame = ::WebSocket::Frame::Incoming::Client.new(:version => @protocol_version)
  loop do
    begin
      frame << @socket.readpartial(@read_buffer_size)
      while message = frame.next
        #"text", "binary", "ping", "pong" and "close" (according to websocket/base.rb)
        determine_message_type(message)
      end
      fire_on_error CapistranoSentinel::WsProtocolError.new(frame.error) if frame.error?
    rescue => e
      log_to_file("#{self.class} crashed with #{e.inspect} #{e.backtrace}")
      fire_on_error(e)
      if @socket.closed? || @socket.eof?
        @read_thread = nil
        fire_on_close
        break
      end
    end
  end
end
send_action(action, channel = nil, data = {}) click to toggle source
# File lib/capistrano_sentinel/classes/websocket_client.rb, line 158
def send_action(action, channel = nil, data = {})
  data = data.is_a?(Hash) ? data : {}
  publishing_data = { 'client_action' => action, 'channel' => channel, 'data' => data }.reject { |_key, value| value.blank? }
  chat(publishing_data)
end
send_data(data, type = :text) click to toggle source

Send the data given by the data param if running on a posix system this uses Ruby's fork method to send if on windows fork won't be attempted.

data

the data to send

type

:text or :binary, defaults to :text

# File lib/capistrano_sentinel/classes/websocket_client.rb, line 190
def send_data(data, type = :text)
  pid = Thread.new do
    do_send(data, type)
  end
end