class MsgpackRpcClient

MessagePack-RPC client

Constants

DEFAULT_CONNECT_RETRY_WAIT
DEFAULT_MAX_CONNECT_RETRIES
DEFAULT_MAX_RETRIES
MAX_MSGID
VERSION

Attributes

connect_retry_wait[R]
host[R]
max_connect_retries[R]
max_retries[R]
port[R]
use_ssl[R]

Public Class Methods

new(options={}) click to toggle source

Initialize client and establish connection to server.

(While it may seem beneficial to not connect in the constructor and wait for the first RPC call, I believe it’s better to fail early.

  • host, port, use_ssl - configure your connection

  • logger - logger (default nil, set to Rails.logger in a Rails app)

Parameters that are better left alone:

TODO: once we are long past the <2.0.0 legacy, replace with named args

# File lib/msgpack_rpc_client.rb, line 30
def initialize(options={})
  @host = options.fetch(:host)
  @port = options.fetch(:port)
  @use_ssl = options.fetch(:use_ssl, false)
  @logger = options.fetch(:logger, nil)
  @max_retries = options.fetch(:max_retries, DEFAULT_MAX_RETRIES)
  @max_connect_retries = options.fetch(:max_connect_retries, DEFAULT_MAX_CONNECT_RETRIES)
  @connect_retry_wait = options.fetch(:connect_retry_wait, DEFAULT_CONNECT_RETRY_WAIT)
  @msgid = 1
  @call_mutex = Mutex.new
  init_socket
end

Public Instance Methods

call(method_name, *params) click to toggle source

Call an RPC method. Will reconnect if the server is down. Threadsafe.

  • Params is anything serializable with Messagepack.

  • Hashes in response will be deserialized with symbolized keys

# File lib/msgpack_rpc_client.rb, line 47
def call(method_name, *params)
  request = nil
  response = nil

  @call_mutex.synchronize do
    request = [0, @msgid, method_name, params]
    @msgid = (@msgid % MAX_MSGID) + 1
    response = make_request_with_retries(request)
  end

  if response[0] != 1
    raise MsgpackRpcClient::Error, 'Response does not bear the proper type flag - something is very wrong'
  end
  if response[1] != request[1]
    raise MsgpackRpcClient::Error, 'Response message id does not match request message id - something is very wrong'
  end
  if response[2] != nil
    raise MsgpackRpcClient::Error, "Server responded with error: #{response[2]}"
  end

  response[3]
end

Private Instance Methods

configure_socket_keepalive() click to toggle source

Configure the TCP stack to send keepalive messages, as we want a long-living connection.

(Not 100% reliable)

# File lib/msgpack_rpc_client.rb, line 119
def configure_socket_keepalive
  @socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
  if defined?(Socket::TCP_KEEPINTVL) # Not available on JRuby
    @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPINTVL, 10)
    @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPCNT, 5)
  end
  if defined?(Socket::TCP_KEEPIDLE) # Not available on BSD / OSX
    @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPIDLE, 50)
  end
end
init_socket() click to toggle source

Opens a socket according to provided configuration. Retries on error.

# File lib/msgpack_rpc_client.rb, line 91
def init_socket
  @socket.close if @socket
  retry_count = 0
  begin
    @logger.info("[MSGPACK-RPC] Connecting to Msgpack-RPC server...") if @logger
    @socket = TCPSocket.new(host, port)
    configure_socket_keepalive
    init_ssl if use_ssl
  rescue Errno::ECONNREFUSED
    @logger.error("[MSGPACK-RPC] Connection refused") if @logger
    retry_count += 1
    if retry_count == max_connect_retries
      raise MsgpackRpcClient::Error, "Could not connect to MsgPack-RPC server"
    else
      sleep(connect_retry_wait)
      # might have a chance with a different instance
      retry
    end
  end
  # Attach streaming packer/unpacker to the socket
  @packer = MessagePack::Packer.new(@socket)
  @unpacker = MessagePack::Unpacker.new(@socket, symbolize_keys: true)
end
init_ssl() click to toggle source

Open an SSL socket on top of the TCP socket.

VERIFY_PEER is mandatory; if you have problems with it, just don’t use SSL - without verification it gives no security benefits but only increases cpu load.

# File lib/msgpack_rpc_client.rb, line 134
def init_ssl
  ctx = OpenSSL::SSL::SSLContext.new
  ctx.set_params(verify_mode: OpenSSL::SSL::VERIFY_PEER)
  # We are overwriting the TCP socket with the SSL socket here.
  @socket = OpenSSL::SSL::SSLSocket.new(@socket, ctx)
  @socket.sync_close = true
  @socket.connect
end
make_request_with_retries(request) click to toggle source

Handles socket connectivity details of sending and receiving. Retries on error.

# File lib/msgpack_rpc_client.rb, line 73
def make_request_with_retries(request)
  retry_count = 0
  begin
    @packer.write(request).flush
    @unpacker.read
  rescue EOFError, IOError, Errno::EPIPE, Errno::ETIMEDOUT, Errno::ECONNRESET
    @logger.error("[MSGPACK-RPC] Msgpack-RPC socket interrupted. Re-establishing commmunications.") if @logger
    retry_count += 1
    if retry_count == max_retries
      raise MsgpackRpcClient::Error, "Failed to re-establish communications with server"
    else
      init_socket
      retry
    end
  end
end