class Protobuf::Rpc::Connectors::Base

Attributes

complete_cb[RW]
error[R]
failure_cb[RW]
options[R]
stats[RW]
success_cb[RW]

Public Class Methods

new(options) click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 30
def initialize(options)
  @options = DEFAULT_OPTIONS.merge(options)
  @stats = ::Protobuf::Rpc::Stat.new(:CLIENT)
end

Public Instance Methods

any_callbacks?() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 35
def any_callbacks?
  [@complete_cb, @failure_cb, @success_cb].any?
end
close_connection() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 39
def close_connection
  fail 'If you inherit a Connector from Base you must implement close_connection'
end
complete() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 43
def complete
  @stats.stop
  logger.info { @stats.to_s }
  logger.debug { sign_message('Response proceessing complete') }
  @complete_cb.call(self) unless @complete_cb.nil?
rescue => e
  logger.error { sign_message('Complete callback error encountered') }
  log_exception(e)
  raise
end
data_callback(data) click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 54
def data_callback(data)
  logger.debug { sign_message('Using data_callback') }
  @used_data_callback = true
  @data = data
end
failure(code, message) click to toggle source

All failures should be routed through this method.

@param [Symbol] code The code we're using (see ::Protobuf::Socketrpc::ErrorReason) @param [String] message The error message

# File lib/protobuf/rpc/connectors/base.rb, line 64
def failure(code, message)
  @error = ClientError.new
  @stats.status = @error.code = ::Protobuf::Socketrpc::ErrorReason.fetch(code)
  @error.message = message

  logger.debug { sign_message("Server failed request (invoking on_failure): #{@error.inspect}") }

  @failure_cb.call(@error) unless @failure_cb.nil?
rescue => e
  logger.error { sign_message("Failure callback error encountered") }
  log_exception(e)
  raise
ensure
  complete
end
first_alive_load_balance?() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 80
def first_alive_load_balance?
  ENV.key?("PB_FIRST_ALIVE_LOAD_BALANCE") ||
    options[:first_alive_load_balance]
end
initialize_stats() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 85
def initialize_stats
  @stats = ::Protobuf::Rpc::Stat.new(:CLIENT)
  @stats.server = [@options[:port], @options[:host]]
  @stats.service = @options[:service].name
  @stats.method_name = @options[:method].to_s
rescue => ex
  log_exception(ex)
  failure(:RPC_ERROR, "Invalid stats configuration. #{ex.message}")
end
log_signature() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 95
def log_signature
  @_log_signature ||= "[client-#{self.class}]"
end
parse_response() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 99
def parse_response
  # Close up the connection as we no longer need it
  close_connection

  logger.debug { sign_message("Parsing response from server (connection closed)") }

  # Parse out the raw response
  @stats.response_size = @response_data.size unless @response_data.nil?
  response_wrapper = ::Protobuf::Socketrpc::Response.decode(@response_data)
  @stats.server = response_wrapper.server if response_wrapper.field?(:server)

  # Determine success or failure based on parsed data
  if response_wrapper.field?(:error_reason)
    logger.debug { sign_message("Error response parsed") }

    # fail the call if we already know the client is failed
    # (don't try to parse out the response payload)
    failure(response_wrapper.error_reason, response_wrapper.error)
  else
    logger.debug { sign_message("Successful response parsed") }

    # Ensure client_response is an instance
    parsed = @options[:response_type].decode(response_wrapper.response_proto.to_s)

    if parsed.nil? && !response_wrapper.field?(:error_reason)
      failure(:BAD_RESPONSE_PROTO, 'Unable to parse response from server')
    else
      verify_callbacks
      succeed(parsed)
      return @data if @used_data_callback
    end
  end
end
ping_port() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 133
def ping_port
  @ping_port ||= ENV["PB_RPC_PING_PORT"]
end
ping_port_enabled?() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 137
def ping_port_enabled?
  ENV.key?("PB_RPC_PING_PORT")
end
request_bytes() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 141
def request_bytes
  validate_request_type!
  return ::Protobuf::Socketrpc::Request.encode(request_fields)
rescue => e
  failure(:INVALID_REQUEST_PROTO, "Could not set request proto: #{e.message}")
end
request_caller() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 148
def request_caller
  @options[:client_host] || ::Protobuf.client_host
end
request_fields() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 152
def request_fields
  { :service_name => @options[:service].name,
    :method_name => @options[:method].to_s,
    :request_proto => @options[:request],
    :caller => request_caller }
end
send_request() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 159
def send_request
  fail 'If you inherit a Connector from Base you must implement send_request'
end
setup_connection() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 163
def setup_connection
  initialize_stats
  @request_data = request_bytes
  @stats.request_size = @request_data.size
end
succeed(response) click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 169
def succeed(response)
  logger.debug { sign_message("Server succeeded request (invoking on_success)") }
  @success_cb.call(response) unless @success_cb.nil?
rescue => e
  logger.error { sign_message("Success callback error encountered") }
  log_exception(e)
  failure(:RPC_ERROR, "An exception occurred while calling on_success: #{e.message}")
ensure
  complete
end
timeout() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 180
def timeout
  if options[:timeout]
    options[:timeout]
  else
    300 # seconds
  end
end
timeout_wrap(&block) click to toggle source

Wrap the given block in a timeout of the configured number of seconds.

# File lib/protobuf/rpc/connectors/base.rb, line 190
def timeout_wrap(&block)
  ::Timeout.timeout(timeout, &block)
rescue ::Timeout::Error
  failure(:RPC_FAILED, "The server took longer than #{timeout} seconds to respond")
end
validate_request_type!() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 196
def validate_request_type!
  unless @options[:request].class == @options[:request_type]
    expected = @options[:request_type].name
    actual = @options[:request].class.name
    failure(:INVALID_REQUEST_PROTO, "Expected request type to be type of #{expected}, got #{actual} instead")
  end
end
verify_callbacks() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 204
def verify_callbacks
  unless any_callbacks?
    logger.debug { sign_message("No callbacks set, using data_callback") }
    @success_cb = @failure_cb = method(:data_callback)
  end
end
verify_options!() click to toggle source
# File lib/protobuf/rpc/connectors/base.rb, line 211
def verify_options!
  # Verify the options that are necessary and merge them in
  [:service, :method, :host, :port].each do |opt|
    failure(:RPC_ERROR, "Invalid client connection configuration. #{opt} must be a defined option.") if @options[opt].nil?
  end
end