class CitrusRpc::RpcServer::WsAcceptor
Public Class Methods
new(args={})
click to toggle source
Create a new websocket acceptor
@param [Hash] args Options
@option args [Boolean] buffer_msg @option args [Integer] interval
# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 24 def initialize args={}, &block @buffer_msg = args[:buffer_msg] @interval = args[:interval] @server = nil @wss = {} @msg_queues = {} @callback = block @listening = false @closed = false end
Public Instance Methods
close()
click to toggle source
Close the acceptor
# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 79 def close return unless @listening && !@closed EM.stop_server @server @closed = true emit :closed end
listen(port)
click to toggle source
Listen on port
@param [Integer] port
# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 41 def listen port raise RuntimeError 'acceptor double listen' if @listening begin @server = WebSocket::EventMachine::Server.start(:host => '0.0.0.0', :port => port) { |ws| ws.onopen { @wss[ws.signature] = ws peer_port, peer_host = Socket.unpack_sockaddr_in ws.get_peername emit :connection, { :id => ws.signature, :ip => peer_host } } ws.onmessage { |msg, type| begin pkg = JSON.parse msg if pkg.instance_of? Array process_msgs ws, pkg else process_msg ws, pkg end rescue => err end } ws.onclose { @wss.delete ws.signature @msg_queues.delete ws.signature } ws.onerror { |err| emit :error, err } } rescue => err emit :error, err end on(:connection) { |obj| ip_filter obj } @listening = true end
Private Instance Methods
clone_error(origin)
click to toggle source
Clone error
@private
# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 91 def clone_error origin { 'msg' => origin.message, 'stack' => nil } end
enqueue(ws, resp)
click to toggle source
Enqueue the response
@param [Object] ws
@private
# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 131 def enqueue ws, resp end
ip_filter(obj)
click to toggle source
ip filter
@param [Object] obj
@private
# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 139 def ip_filter obj end
process_msg(ws, pkg)
click to toggle source
Process message
@param [Object] ws @param [Hash] pkg
@private
# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 101 def process_msg ws, pkg @callback.call pkg['msg'] { |*args| args.each_with_index { |arg, index| args[index] = clone_error arg if arg.is_a? Exception } resp = { 'id' => pkg['id'], 'resp' => args } if @buffer_msg enqueue ws, resp else ws.send resp.to_json end } end
process_msgs(ws, pkgs)
click to toggle source
Batch version for process_msg
@param [Object] ws @param [Array] pkgs
@private
# File lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb, line 122 def process_msgs ws, pkgs pkgs.each { |pkg| process_msg ws, pkg } end