class CitrusRpc::RpcClient::Client

Client

@example

Create a new rpc client

client = CitrusRpc::RpcClient::Client.new

Add a proxy

dirname = File.expand_path File.dirname(__FILE__)
client.add_proxy(
  :namespace => 'user',
  :server_type => 'test',
  :path => dirname + '/remote/test' # remote service interface path
)

Add a remote server

client.add_server(
  :server_id => 'test-server-1',
  :server_type => 'test',
  :host => '127.0.0.1',
  :port => 3333
)

Do the rpc invoke

client.start do |err|
  client.proxies.sys.connector.WhoAmIRemote.do(nil, 'hello') do |err, resp|
    ...
  end
end

Attributes

proxies[R]

Public Class Methods

new(args={}) click to toggle source

Create a new rpc client

@param [Hash] args Options

@option args [Object] context @option args [Object] route_context @option args [#call] router @option args [String] router_type

# File lib/citrus-rpc/rpc-client/client.rb, line 64
def initialize args={}
  @args = args

  @context = @args[:context]
  @route_context = @args[:route_context]

  @router = @args[:router] || method(:df_route)
  @router_type = @args[:router_type]

  @proxies = OpenStruct.new
  @station = MailStation.new args

  @state = :state_inited
end

Public Instance Methods

add_proxies(records) click to toggle source

Batch version for add_proxy

@param [Array] records

# File lib/citrus-rpc/rpc-client/client.rb, line 123
def add_proxies records
  if records && records.length > 0
    records.each { |record| add_proxy record }
  end
end
add_proxy(record) click to toggle source

Add a new proxy to the rpc client which would override the proxy under the same key

@param [Hash] record

# File lib/citrus-rpc/rpc-client/client.rb, line 111
def add_proxy record
  return unless record

  proxy = generate_proxy record
  return unless proxy

  insert_proxy @proxies, record[:namespace], record[:server_type], proxy
end
add_server(server) click to toggle source

Add new remote server to the rpc client

@param [Hash] server

# File lib/citrus-rpc/rpc-client/client.rb, line 132
def add_server server
  @station.add_server server
end
add_servers(servers) click to toggle source

Batch version for add new remote server

@param [Array] servers

# File lib/citrus-rpc/rpc-client/client.rb, line 139
def add_servers servers
  @station.add_servers servers
end
after(filter) click to toggle source

Add rpc after filter

@param [#call] filter

# File lib/citrus-rpc/rpc-client/client.rb, line 186
def after filter
  @station.after filter
end
before(filter) click to toggle source

Add rpc before filter

@param [#call] filter

# File lib/citrus-rpc/rpc-client/client.rb, line 179
def before filter
  @station.before filter
end
filter(filter) click to toggle source

Add rpc filter

@param [#call] filter

# File lib/citrus-rpc/rpc-client/client.rb, line 193
def filter filter
  @station.filter filter
end
remove_server(id) click to toggle source

Remove remote server from the rpc client

@param [String] id

# File lib/citrus-rpc/rpc-client/client.rb, line 146
def remove_server id
  @station.remove_server id
end
remove_servers(ids) click to toggle source

Batch version for remove remote server

@param [Array] server_ids

# File lib/citrus-rpc/rpc-client/client.rb, line 153
def remove_servers ids
  @station.remove_servers ids
end
replace_servers(servers) click to toggle source

Replace remote servers

@param [Array] servers

# File lib/citrus-rpc/rpc-client/client.rb, line 160
def replace_servers servers
  @station.replace_servers servers
end
rpc_invoke(server_id, msg) { |exception 'fail to do rpc invoke for client is not running'| ... } click to toggle source

Do the rpc invoke directly

@param [String] server_id @param [Hash] msg

# File lib/citrus-rpc/rpc-client/client.rb, line 168
def rpc_invoke server_id, msg, &block
  unless @state == :state_started
    block_given? and yield Exception.new 'fail to do rpc invoke for client is not running'
    return
  end
  @station.dispatch server_id, msg, @args, block
end
set_error_handler(handler) click to toggle source

Set rpc filter error handler

@param [#call] handler

# File lib/citrus-rpc/rpc-client/client.rb, line 200
def set_error_handler handler
  @station.error_handler = handler
end
start() { |exception 'rpc client has started'| ... } click to toggle source

Start the rpc client which would try to connect the remote servers

# File lib/citrus-rpc/rpc-client/client.rb, line 80
def start
  unless @state == :state_inited
    block_given? and yield Exception.new 'rpc client has started'
    return
  end

  @station.start { |err|
    if err
      block_given? and yield err
      return
    end
    @state = :state_started
    block_given? and yield
  }
end
stop(force=false) click to toggle source

Stop the rpc client

@param [Boolean] force

# File lib/citrus-rpc/rpc-client/client.rb, line 99
def stop force=false
  unless @state == :state_started
    return
  end
  @state = :state_closed
  @station.stop force
end

Private Instance Methods

generate_proxy(record) click to toggle source

Generate proxies for remote servers

@param [Hash] record

@private

# File lib/citrus-rpc/rpc-client/client.rb, line 211
def generate_proxy record
  res = OpenStruct.new;
  remotes = load_app_remote record[:path]
  remotes.each { |service, remote|
    res[service] = create_proxy(
      :remote => remote, :service => service, :attach => record, :proxy_cb => method(:proxy_cb)
    )
  }
  res
end
get_route_target(server_type, msg, route_param) { |err, server_id| ... } click to toggle source

Calculate remote target server id for rpc client

@param [String] server_type @param [Hash] msg @param [Object] route_param

@private

# File lib/citrus-rpc/rpc-client/client.rb, line 263
def get_route_target server_type, msg, route_param, &block
  if @router_type
    router = case @router_type
             when :roundrobin
               method :rr_route
             when :weight_roundrobin
               method :wrr_route
             when :least_active
               method :la_route
             when :consistent_hash
               method :ch_route
             else
               method :rd_route
             end
    router.call(self, server_type, msg) { |err, server_id|
      block_given? and yield err, server_id
    }
  else
    unless @router.respond_to? :call
      block_given? and yield Exception.new 'invalid router method'
      return
    end
    @router.call(route_param, msg, @route_context) { |err, server_id|
      block_given? and yield err, server_id
    }
  end
end
insert_proxy(proxies, namespace, server_type, proxy) click to toggle source

Add proxy into array

@param [Object] proxies @param [String] namespace @param [String] server_type @param [Object] proxy

@private

# File lib/citrus-rpc/rpc-client/client.rb, line 322
def insert_proxy proxies, namespace, server_type, proxy
  proxies[namespace] ||= OpenStruct.new
  proxies[namespace][server_type] = proxy
end
proxy_cb(service, method, attach, is_to_specified_server, *args, &block) click to toggle source

Proxy callback

@param [String] service @param [Symbol] method @param [Hash] attach @param [Boolean] is_to_specified_server

@private

# File lib/citrus-rpc/rpc-client/client.rb, line 230
def proxy_cb service, method, attach, is_to_specified_server, *args, &block
  unless @state == :state_started
    return
  end
  unless args.length > 0
    return
  end

  route_param = args.shift
  server_type = attach[:server_type]
  msg = { :namespace => attach[:namespace], :server_type => server_type,
    :service => service, :method => method, :args => args }

  if is_to_specified_server
    rpc_to_specified_server msg, server_type, route_param, &block
  else
    get_route_target(server_type, msg, route_param) { |err, server_id|
      if err
        block_given? and block.call err
      else
        rpc_invoke server_id, msg, &block
      end
    }
  end
end
rpc_to_specified_server(msg, server_type, route_param, &block) click to toggle source

Rpc to specified server id or servers

@param [Hash] msg @param [String] server_type @param [String] route_param

@private

# File lib/citrus-rpc/rpc-client/client.rb, line 298
def rpc_to_specified_server msg, server_type, route_param, &block
  unless route_param.instance_of? String
    return
  end

  if route_param == '*'
    @station.servers.each { |server_id, server|
      if server[:server_type] == server_type
        rpc_invoke server_id, msg, &block
      end
    }
  else
    rpc_invoke route_param, msg, &block
  end
end