class CitrusRpc::RpcServer::WsAcceptor

WsAcceptor

Public Class Methods

new(args={}) click to toggle source

Create a new websocket acceptor

@param [Hash] args Options

@option args [Boolean] buffer_msg @option args [Integer] interval

# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 24
def initialize args={}, &block
  @buffer_msg = args[:buffer_msg]
  @interval = args[:interval]

  @server = nil
  @wss = {}

  @msg_queues = {}
  @callback = block

  @listening = false
  @closed = false
end

Public Instance Methods

close() click to toggle source

Close the acceptor

# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 79
def close
  return unless @listening && !@closed
  EM.stop_server @server
  @closed = true
  emit :closed
end
listen(port) click to toggle source

Listen on port

@param [Integer] port

# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 41
def listen port
  raise RuntimeError 'acceptor double listen' if @listening

  begin
    @server = WebSocket::EventMachine::Server.start(:host => '0.0.0.0', :port => port) { |ws|
      ws.onopen {
        @wss[ws.signature] = ws
        peer_port, peer_host = Socket.unpack_sockaddr_in ws.get_peername
        emit :connection, { :id => ws.signature, :ip => peer_host }
      }

      ws.onmessage { |msg, type|
        begin
          pkg = JSON.parse msg
          if pkg.instance_of? Array
            process_msgs ws, pkg
          else
            process_msg ws, pkg
          end
        rescue => err
        end
      }

      ws.onclose {
        @wss.delete ws.signature
        @msg_queues.delete ws.signature
      }
      ws.onerror { |err| emit :error, err }
    }
  rescue => err
    emit :error, err
  end

  on(:connection) { |obj| ip_filter obj }
  @listening = true
end

Private Instance Methods

clone_error(origin) click to toggle source

Clone error

@private

# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 91
def clone_error origin
  { 'msg' => origin.message, 'stack' => nil }
end
enqueue(ws, resp) click to toggle source

Enqueue the response

@param [Object] ws

@private

# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 131
def enqueue ws, resp
end
ip_filter(obj) click to toggle source

ip filter

@param [Object] obj

@private

# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 139
def ip_filter obj
end
process_msg(ws, pkg) click to toggle source

Process message

@param [Object] ws @param [Hash] pkg

@private

# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 101
def process_msg ws, pkg
  @callback.call pkg['msg'] { |*args|
    args.each_with_index { |arg, index|
      args[index] = clone_error arg if arg.is_a? Exception
    }

    resp = { 'id' => pkg['id'], 'resp' => args }
    if @buffer_msg
      enqueue ws, resp
    else
      ws.send resp.to_json
    end
  }
end
process_msgs(ws, pkgs) click to toggle source

Batch version for process_msg

@param [Object] ws @param [Array] pkgs

@private

# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 122
def process_msgs ws, pkgs
  pkgs.each { |pkg| process_msg ws, pkg }
end