class CitrusRpc::RpcClient::WsMailBox

WsMailBox

Public Class Methods

new(server, args={}) click to toggle source

Create a new websocket mailbox

@param [Hash] server server info @param [Hash] args Options

@option args [Object] context @option args [Object] route_context @option args [#call] router @option args [String] router_type

# File lib/citrus-rpc/rpc-client/mailboxes/ws_mailbox.rb, line 28
def initialize server, args={}
  @cur_id = 0

  @id = server[:id]
  @host = server[:host]
  @port = server[:port]

  @requests = {}
  @timeout = {}
  @queue = []

  @buffer_msg = args[:buffer_msg]
  @interval = args[:interval] || Constants::DefaultParams::Interval
  @timeout_value = args[:timeout] || Constants::DefaultParams::Timeout

  @connected = false
  @closed = false

  @args = args
end

Public Instance Methods

close() click to toggle source

Close the mail box

# File lib/citrus-rpc/rpc-client/mailboxes/ws_mailbox.rb, line 79
def close
  return if @closed
  @closed = true
  @ws.close
end
connect() { |exception 'mailbox has already connected'| ... } click to toggle source

Connect to remote server

# File lib/citrus-rpc/rpc-client/mailboxes/ws_mailbox.rb, line 50
def connect
  if @connected
    block_given? and yield Exception.new 'mailbox has already connected'
    return
  end

  begin
    @ws = WebSocket::EventMachine::Client.connect :uri => 'ws://' + @host + ':' + @port.to_s
    @ws.onopen {
      return if @connected
      @connected = true
      @timer = EM.add_periodic_timer(@interval) { flush } if @buffer_msg
      block_given? and yield
    }

    @ws.onmessage { |msg, type|
      process_msg msg, type
    }

    @ws.onerror { |err| }
    @ws.onclose { |code, reason|
      emit :close, @id
    }
  rescue => err
    block_given? and yield err
  end
end
send(msg, opts, block) click to toggle source

Send message to remote server

@param [Hash] msg @param [Hash] opts @param [#call] block

# File lib/citrus-rpc/rpc-client/mailboxes/ws_mailbox.rb, line 90
def send msg, opts, block
  unless @connected
    block.call Exception.new 'websocket mailbox has not connected'
    return
  end

  if @closed
    block.call Exception.new 'websocket mailbox has already closed'
    return
  end

  id = @cur_id
  @cur_id += 1
  @requests[id] = block

  pkg = { :id => id, :msg => msg }
  if @buffer_msg
    enqueue pkg
  else
    @ws.send pkg.to_json
  end
end

Private Instance Methods

enqueue(pkg) click to toggle source

Enqueue the package

@param [Hash] pkg

@private

# File lib/citrus-rpc/rpc-client/mailboxes/ws_mailbox.rb, line 120
def enqueue pkg
  @queue << pkg
end
flush() click to toggle source

Flush

@private

# File lib/citrus-rpc/rpc-client/mailboxes/ws_mailbox.rb, line 127
def flush
  if @closed || @queue.length == 0
    return
  end
  @ws.send @queue.to_json
  @queue = []
end
process_msg(msg, type) click to toggle source

Process message

@param [Hash] msg

@private

# File lib/citrus-rpc/rpc-client/mailboxes/ws_mailbox.rb, line 140
def process_msg msg, type
  begin
    pkg = JSON.parse msg
    pkg_id = pkg['id']
    pkg_resp = pkg['resp']

    return unless block = @requests[pkg_id]
    @requests.delete pkg_id

    args = [nil]
    pkg_resp.each { |arg| args << arg }

    block.call *args
  rescue => err
  end
end
process_msgs() click to toggle source

Batch version for process_msg

@private

# File lib/citrus-rpc/rpc-client/mailboxes/ws_mailbox.rb, line 160
def process_msgs
end