class CitrusAdmin::MasterAgent

MasterAgent

Public Class Methods

new(args={}) click to toggle source

Create a new master agent

@param [Hash] args Options

@option args [Object] :console_service @option args [Array] :white_list

# File lib/citrus-admin/master_agent.rb, line 19
def initialize args={}
  @console_service = args[:console_service]
  @servers = {}
  @servers_map = {}
  @slaves_map = {}
  @clients = {}
  @req_id = 1
  @callbacks = {}
  @wss = {}
  @white_list = args[:white_list]
  @state = :state_inited
end

Public Instance Methods

broadcast_command(command, module_id, msg) click to toggle source

Broadcast command

@param [String] command @param [String] module_id @param [Object] msg

# File lib/citrus-admin/master_agent.rb, line 239
def broadcast_command command, module_id, msg
  return if @state != :state_started
  broadcast_command_msg @servers.values, command, module_id, msg
end
broadcast_notify(module_id, msg) click to toggle source

Broadcast notify

@param [String] module_id @param [Object] msg

# File lib/citrus-admin/master_agent.rb, line 229
def broadcast_notify module_id, msg
  return if @state != :state_started
  broadcast_notify_msg @servers.values, module_id, msg
end
close() click to toggle source

Close the agent

# File lib/citrus-admin/master_agent.rb, line 98
def close
  return unless @state == :state_started
  @state = :state_closed
  EM.stop_server @server
end
get_client_by_id(client_id) click to toggle source

Get client by id

@param [String] client_id

# File lib/citrus-admin/master_agent.rb, line 107
def get_client_by_id client_id
  @clients[client_id]
end
listen(port) { || ... } click to toggle source

Listen to a port and handle register and request

@param [Integer] port

# File lib/citrus-admin/master_agent.rb, line 35
def listen port, &block
  if @state != :state_inited
    return
  end
  @state = :state_started
  begin
    @server = WebSocket::EventMachine::Server.start(:host => '0.0.0.0', :port => port.to_s) { |ws|
      ws_context = {
        # 'monitor' or 'client'
        :type => nil,
        # for monitor connection
        :server_id => nil, :server_type => nil, :server_info => nil,
        # for client connection
        :client_id => nil,
        # for both connection
        :username => nil, :registered => false
      }
      ws.onopen {
        @wss[ws.signature] = ws
        peer_port, peer_host = Socket.unpack_sockaddr_in ws.get_peername
        emit 'connection', { :id => ws.signature, :ip => peer_host }
      }
      ws.onmessage { |msg, type|
        begin
          event, msg = parse msg
          case event
          when 'register'
            process_register_msg ws, ws_context, msg
          when 'monitor'
            process_msg_from_monitor ws, ws_context, msg
          when 'client'
            process_msg_from_client ws, ws_context, msg
          else
          end
        rescue => err
        end
      }
      ws.onclose {
        @wss.delete ws.signature
        if ws_context[:registered]
          case ws_context[:type]
          when 'monitor'
            remove_monitor_connection(ws_context[:server_id],
              ws_context[:server_type], ws_context[:server_info])
          when 'client'
            remove_client_connection ws_context[:client_id]
          else
          end
          emit 'disconnect', ws_context
        end
      }
      ws.onerror { |err|
        emit 'err', err
      }
    }
    block_given? && yield
  rescue => err
    emit 'error', err
  end
  on('connection') { |obj| ip_filter obj }
end
notify(server_id, module_id, msg) click to toggle source

Notify server by server id

@param [String] server_id @param [String] module_id @param [Object] msg

# File lib/citrus-admin/master_agent.rb, line 162
def notify server_id, module_id, msg
  return if @state != :state_started
  if !server = @servers[server_id]
    return false
  end
  send_msg_to_monitor server[:ws], nil, module_id, msg
  return true
end
notify_by_server(server_id, server_info, module_id, msg) click to toggle source

Notify server by server

@param [String] server_id @param [Object] server_info @param [String] module_id @param [Object] msg

# File lib/citrus-admin/master_agent.rb, line 177
def notify_by_server server_id, server_info, module_id, msg
  return if @state != :state_started
  if !server = @servers[server_id]
    return false
  end
  if Utils.compare_server server[:server_info], server_info
    send_msg_to_monitor server[:ws], nil, module_id, msg
  else
    @slaves_map[server_id].each { |server|
      if Utils.compare_server server[:server_info], server_info
        send_msg_to_monitor server[:ws], nil, module_id, msg
        break
      end
    }
  end
  return true
end
notify_by_server_type(server_type, module_id, msg) click to toggle source

Notify by server type

@param [String] server_type @param [String] module_id @param [Object] msg

# File lib/citrus-admin/master_agent.rb, line 200
def notify_by_server_type server_type, module_id, msg
  return if @state != :state_started
  servers = @servers_map[server_type]
  if !servers || servers.empty?
    return false
  end
  broadcast_notify_msg servers, module_id, msg
  return true
end
notify_client(client_id, module_id, msg) click to toggle source

Notify client

@param [String] client_id @param [String] module_id @param [Object] msg

# File lib/citrus-admin/master_agent.rb, line 249
def notify_client client_id, module_id, msg
  return if @state != :state_started
  if !client = @clients[client_id]
    return
  end
  send_msg_to_client client[:ws], nil, module_id, msg
end
notify_to_slaves(server_id, module_id, msg) click to toggle source

Notify to slaves

@param [String] server_id @param [String] module_id @param [Object] msg

# File lib/citrus-admin/master_agent.rb, line 215
def notify_to_slaves server_id, module_id, msg
  return if @state != :state_started
  servers = @slaves_map[server_id]
  if !servers || servers.empty?
    return false
  end
  broadcast_notify_msg servers, module_id, msg
  return true
end
request(server_id, module_id, msg, block) click to toggle source

Request by server id

@param [String] server_id @param [String] module_id @param [Object] msg @param [#call] block

# File lib/citrus-admin/master_agent.rb, line 117
def request server_id, module_id, msg, block
  return if @state != :state_started
  if !server = @servers[server_id]
    block.call Exception.new 'unknown server id: ' + server_id
    return
  end
  req_id = @req_id
  @req_id += 1
  @callbacks[req_id] = block
  send_msg_to_monitor server[:ws], req_id, module_id, msg
end
request_by_server(server_id, server_info, module_id, msg, block) click to toggle source

Request by server

@param [String] server_id @param [Object] server_info @param [String] module_id @param [Object] msg @param [#call] block

# File lib/citrus-admin/master_agent.rb, line 136
def request_by_server server_id, server_info, module_id, msg, block
  return if @state != :state_started
  if !server = @servers[server_id]
    block.call Exception.new 'unknown server id: ' + server_id
    return
  end
  req_id = @req_id
  @req_id += 1
  @callbacks[req_id] = block
  if Utils.compare_server server[:server_info], server_info
    send_msg_to_monitor server[:ws], req_id, module_id, msg
  else
    @slaves_map[server_id].each { |server|
      if Utils.compare_server server[:server_info], server_info
        send_msg_to_monitor server[:ws], req_id, module_id, msg
        break
      end
    }
  end
end

Private Instance Methods

add_client_connection(client_id, user, ws) click to toggle source

Add client connection

@param [String] client_id @param [Object] user @param [Object] ws

@private

# File lib/citrus-admin/master_agent.rb, line 413
def add_client_connection client_id, user, ws
  client = {
    :client_id => client_id,
    :user_info => user,
    :ws => ws
  }
  @clients[client_id] = client
  client
end
add_monitor_connection(server_id, server_type, server_info, pid, ws) click to toggle source

Add server connection

@param [String] server_id @param [String] server_type @param [Object] server_info @param [Integer] pid @param [Obejct] ws

@private

# File lib/citrus-admin/master_agent.rb, line 387
def add_monitor_connection server_id, server_type, server_info, pid, ws
  server = {
    :server_id => server_id,
    :server_type => server_type,
    :server_info => server_info,
    :pid => pid,
    :ws => ws
  }
  if !@servers[server_id]
    @servers[server_id] = server
    @servers_map[server_type] ||= []
    @servers_map[server_type] << server
  else
    @slaves_map[server_id] ||= []
    @slaves_map[server_id] << server
  end
  server
end
broadcast_command_msg(servers, command, module_id, msg) click to toggle source

Broadcast command message

@param [Array] servers @param [String] command @param [String] module_id @param [Object] msg

@private

# File lib/citrus-admin/master_agent.rb, line 510
def broadcast_command_msg servers, command, module_id, msg
  msg = compose_command nil, command, module_id, msg
  servers.each { |server|
    server[:ws].send ['monitor', msg].to_json
  }
end
broadcast_notify_msg(servers, module_id, msg) click to toggle source

Broadcast notify message

@param [Array] servers @param [String] module_id @param [Object] msg

@private

# File lib/citrus-admin/master_agent.rb, line 495
def broadcast_notify_msg servers, module_id, msg
  msg = compose_request nil, module_id, msg
  servers.each { |server|
    server[:ws].send ['monitor', msg].to_json
  }
end
do_auth_server(msg, ws) { |exception 'server auth failed'| ... } click to toggle source

Do auth server

@param [Object] msg @param [Object] ws

@private

# File lib/citrus-admin/master_agent.rb, line 523
def do_auth_server msg, ws, &block
  if !block_given?
    raise ArgumentError 'expected a code block'
  end
  @console_service.auth_server.call(msg, @console_service.env) { |res|
    if res != 'ok'
      ws.send ['register', {
        :code => PRO_FAIL,
        :msg => 'server auth failed'
      }].to_json
      yield Exception.new 'server auth failed'
      return
    end
    add_monitor_connection msg[:server_id], msg[:server_type], msg[:server_info], msg[:pid], ws
    ws.send ['register', {
      :code => PRO_OK,
      :msg => 'ok'
    }].to_json

    if msg[:server_info]
      msg[:server_info][:pid] = msg[:pid]
      emit 'register', msg[:server_info]
    end

    yield nil
  }
end
do_auth_user(msg, ws) { |exception 'client should have a client id'| ... } click to toggle source

Do auth user

@param [Object] msg @param [Object] ws

@private

# File lib/citrus-admin/master_agent.rb, line 557
def do_auth_user msg, ws, &block
  if !block_given?
    raise ArgumentError 'expected a code block'
  end
  if !client_id = msg[:client_id]
    yield Exception.new 'client should have a client id'
    return
  end
  if !username = msg[:username]
    ws.send ['register', {
      :code => PRO_FAIL,
      :msg => 'client should auth with username'
    }].to_json
    yield Exception.new 'client should auth with username'
    return
  end
  @console_service.auth_user.call(msg, @console_service.env) { |user|
    if !user
      ws.send ['register', {
        :code => PRO_FAIL,
        :msg => 'client auth failed'
      }].to_json
      yield Exception.new 'client auth failed'
      return
    end
    if @clients[client_id]
      ws.send ['register', {
        :code => PRO_FAIL,
        :msg => 'client id has already been registered'
      }].to_json
      yield Exception.new 'client id has already been registered'
      return
    end
    add_client_connection client_id, user, ws
    ws.send ['register', {
      :code => PRO_OK,
      :msg => 'ok'
    }].to_json
    yield nil
  }
end
ip_filter(obj) click to toggle source

ip filter

@param [Object] obj

@private

# File lib/citrus-admin/master_agent.rb, line 604
def ip_filter obj
end
process_msg_from_client(ws, ws_context, msg) click to toggle source

Process message from client

@param [Object] ws @param [Object] ws_context @param [Object] msg

@private

# File lib/citrus-admin/master_agent.rb, line 345
def process_msg_from_client ws, ws_context, msg
  if !ws_context[:registered]
    ws.close
    return
  end
  if ws_context[:type] != 'client'
    return
  end
  if msg[:command]
    # a command from client
    @console_service.command(msg[:command], msg[:module_id], msg[:body]) { |err, res|
      if is_request? msg
        if resp = compose_response(msg, err, res)
          ws.send ['client', resp].to_json
        end
      else
        # notify should not have a callback
      end
    }
  else
    # a request or a notify from client
    @console_service.execute(msg[:module_id], :client_handler, msg[:body]) { |err, res|
      if is_request? msg
        if resp = compose_response(msg, err, res)
          ws.send ['client', resp].to_json
        end
      else
        # notify should not have a callback
      end
    }
  end
end
process_msg_from_monitor(ws, ws_context, msg) click to toggle source

Process message from monitor

@param [Object] ws @param [Object] ws_context @param [Object] msg

@private

# File lib/citrus-admin/master_agent.rb, line 308
def process_msg_from_monitor ws, ws_context, msg
  if !ws_context[:registered]
    ws.close
    return
  end
  if ws_context[:type] != 'monitor'
    return
  end
  if resp_id = msg[:resp_id]
    # response from monitor
    callback = @callbacks[resp_id]
    if !callback
      return
    end
    @callbacks.delete resp_id
    callback.call msg[:err], msg[:body]
    return
  end
  # request or notify from monitor
  @console_service.execute(msg[:module_id], :master_handler, msg[:body]) { |err, res|
    if is_request? msg
      if resp = compose_response(msg, err, res)
        ws.send ['monitor', resp].to_json
      end
    else
      # notify should not have a callback
    end
  }
end
process_register_msg(ws, ws_context, msg) click to toggle source

Process register message

@param [Object] ws @param [Object] ws_context @param [Object] msg

@private

# File lib/citrus-admin/master_agent.rb, line 266
def process_register_msg ws, ws_context, msg
  return unless msg && msg[:type]
  case msg[:type]
  when 'monitor'
    return unless msg[:server_id]
    ws_context[:type] = msg[:type]
    ws_context[:server_id] = msg[:server_id]
    ws_context[:server_type] = msg[:server_type]
    ws_context[:server_info] = msg[:server_info]
    do_auth_server(msg, ws) { |err|
      if err
        ws.close
        return
      end
      ws_context[:registered] = true
    }
  when 'client'
    ws_context[:type] = msg[:type]
    do_auth_user(msg, ws) { |err|
      if err
        ws.close
        return
      end
      ws_context[:username] = msg[:username]
      ws_context[:registered] = true
    }
  else
    ws.send ['register', {
      :code => PRO_FAIL,
      :msg => 'unknown auth type'
    }].to_json
    ws.close
  end
end
remove_client_connection(client_id) click to toggle source

Remove client connection

@param [String] client_id

@private

# File lib/citrus-admin/master_agent.rb, line 458
def remove_client_connection client_id
  @clients.delete ws_context[:client_id]
end
remove_monitor_connection(server_id, server_type, server_info) click to toggle source

Remove monitor connection

@param [String] server_id @param [String] server_type @param [Object] server_info

@private

# File lib/citrus-admin/master_agent.rb, line 430
def remove_monitor_connection server_id, server_type, server_info
  # if Utils.compare_server @servers[server_id][:server_info], server_info
  #   @servers.delete server_id
  #   if @servers_map[server_type]
  #     @servers_map[server_type].delete_if { |server|
  #       server[:server_id] == server_id
  #     }
  #     if @servers_map[server_type].empty?
  #       @servers_map.delete server_type
  #     end
  #   end
  # else
  #   if @slaves_map[server_id]
  #     @slaves_map[server_id].delete_if { |server|
  #       Utils.compare_server server[:server_info], server_info
  #     }
  #     if @slaves_map[server_id].empty?
  #       @slaves_map.delete server_id
  #     end
  #   end
  # end
end
send_msg_to_client(ws, req_id, module_id, msg) click to toggle source

Send message to client

@param [Object] ws @param [Integer] req_id @param [String] module_id @param [Object] msg

@private

# File lib/citrus-admin/master_agent.rb, line 483
def send_msg_to_client ws, req_id, module_id, msg
  msg = compose_request req_id, module_id, msg
  ws.send ['client', msg].to_json
end
send_msg_to_monitor(ws, req_id, module_id, msg) click to toggle source

Send message to monitor

@param [Object] ws @param [Integer] req_id @param [String] module_id @param [Object] msg

@private

# File lib/citrus-admin/master_agent.rb, line 470
def send_msg_to_monitor ws, req_id, module_id, msg
  msg = compose_request req_id, module_id, msg
  ws.send ['monitor', msg].to_json
end