module Elasticsearch::Transport::Transport::Base

@abstract Module with common functionality for transport implementations.

Constants

DEFAULT_MAX_RETRIES
DEFAULT_PORT
DEFAULT_PROTOCOL
DEFAULT_RELOAD_AFTER
DEFAULT_RESURRECT_AFTER
DEFAULT_SERIALIZER_CLASS

Attributes

connections[R]
counter[R]
hosts[R]
last_request_at[R]
logger[RW]
max_retries[RW]
options[R]
protocol[R]
reload_after[RW]
reload_connections[RW]
resurrect_after[RW]
serializer[RW]
sniffer[RW]
tracer[RW]

Public Class Methods

new(arguments={}, &block) click to toggle source

Creates a new transport object.

@param arguments [Hash] Settings and options for the transport @param block [Proc] Lambda or Proc which can be evaluated in the context of the “session” object

@option arguments [Array] :hosts An Array of normalized hosts information @option arguments [Array] :options A Hash with options (usually passed by {Client})

@see Client#initialize

# File lib/elasticsearch/transport/transport/base.rb, line 30
def initialize(arguments={}, &block)
  @hosts       = arguments[:hosts]   || []
  @options     = arguments[:options] || {}
  @block       = block
  @connections = __build_connections

  @serializer  = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
  @protocol    = options[:protocol] || DEFAULT_PROTOCOL

  @logger      = options[:logger]
  @tracer      = options[:tracer]

  @sniffer     = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
  @counter     = 0
  @last_request_at = Time.now
  @reload_connections = options[:reload_connections]
  @reload_after    = options[:reload_connections].is_a?(Fixnum) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
  @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
  @max_retries     = options[:retry_on_failure].is_a?(Fixnum)   ? options[:retry_on_failure]   : DEFAULT_MAX_RETRIES
end

Public Instance Methods

__build_connections() click to toggle source

@abstract A transport implementation must implement this method.

See {HTTP::Faraday#__build_connections} for an example.

@return [Connections::Collection] @api private

# File lib/elasticsearch/transport/transport/base.rb, line 255
def __build_connections
  raise NoMethodError, "Implement this method in your class"
end
__convert_to_json(o=nil, options={}) click to toggle source

Converts any non-String object to JSON

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 141
def __convert_to_json(o=nil, options={})
  o = o.is_a?(String) ? o : serializer.dump(o, options)
end
__full_url(host) click to toggle source

Returns a full URL based on information from host

@param host [Hash] Host configuration passed in from {Client}

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 150
def __full_url(host)
  url  = "#{host[:protocol]}://"
  url += "#{host[:user]}:#{host[:password]}@" if host[:user]
  url += "#{host[:host]}:#{host[:port]}"
  url += "#{host[:path]}" if host[:path]
  url
end
__log(method, path, params, body, url, response, json, took, duration) click to toggle source

Log request and response information.

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 104
def __log(method, path, params, body, url, response, json, took, duration)
  logger.info  "#{method.to_s.upcase} #{url} " +
               "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
  logger.debug "> #{__convert_to_json(body)}" if body
  logger.debug "< #{response.body}"
end
__log_failed(response) click to toggle source

Log failed request.

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 114
def __log_failed(response)
  logger.fatal "[#{response.status}] #{response.body}"
end
__raise_transport_error(response) click to toggle source

Raise error specific for the HTTP response status or a generic server error

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 133
def __raise_transport_error(response)
  error = ERRORS[response.status] || ServerError
  raise error.new "[#{response.status}] #{response.body}"
end
__rebuild_connections(arguments={}) click to toggle source

Replaces the connections collection.

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 94
def __rebuild_connections(arguments={})
  @hosts       = arguments[:hosts]    || []
  @options     = arguments[:options]  || {}
  @connections = __build_connections
end
__trace(method, path, params, body, url, response, json, took, duration) click to toggle source

Trace the request in the ‘curl` format.

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 121
def __trace(method, path, params, body, url, response, json, took, duration)
  trace_url  = "http://localhost:9200/#{path}?pretty" +
               ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
  trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
  tracer.info  "curl -X #{method.to_s.upcase} '#{trace_url}'#{trace_body}\n"
  tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#"
  tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n"
end
get_connection(options={}) click to toggle source

Returns a connection from the connection pool by delegating to {Connections::Collection#get_connection}.

Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.

@return [Connections::Connection] @see Connections::Collection#get_connection

# File lib/elasticsearch/transport/transport/base.rb, line 59
def get_connection(options={})
  resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after

  connection = connections.get_connection(options)
  @counter  += 1

  reload_connections!         if reload_connections && counter % reload_after == 0
  connection
end
host_unreachable_exceptions() click to toggle source

@abstract Returns an Array of connection errors specific to the transport implementation.

See {HTTP::Faraday#host_unreachable_exceptions} for an example.

@return [Array]

# File lib/elasticsearch/transport/transport/base.rb, line 246
def host_unreachable_exceptions
  [Errno::ECONNREFUSED, Elasticsearch::Transport::Transport::Errors::BadGateway]
end
perform_request(method, path, params={}, body=nil, &block) click to toggle source

Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.

@abstract The transport implementation has to implement this method either in full,

or by invoking this method with a block. See {HTTP::Faraday#perform_request} for an example.

@param method [String] Request method @param path [String] The API endpoint @param params [Hash] Request parameters (will be serialized by {Connections::Connection#full_url}) @param body [Hash] Request body (will be serialized by the {#serializer}) @param block [Proc] Code block to evaluate, passed from the implementation

@return [Response] @raise [NoMethodError] If no block is passed @raise [ServerError] If request failed on server @raise [Error] If no connection is available

# File lib/elasticsearch/transport/transport/base.rb, line 175
def perform_request(method, path, params={}, body=nil, &block)
  raise NoMethodError, "Implement this method in your transport class" unless block_given?
  start = Time.now if logger || tracer
  tries = 0

  begin
    tries     += 1
    connection = get_connection or raise Error.new("Cannot get new connection from pool.")

    if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
      params = connection.connection.params.merge(params.to_hash)
    end

    url        = connection.full_url(path, params)

    response   = block.call(connection, url)

    connection.healthy! if connection.failures > 0

  rescue *host_unreachable_exceptions => e
    logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger

    connection.dead!

    if @options[:reload_on_failure] and tries < connections.all.size
      logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger
      reload_connections! and retry
    end

    if @options[:retry_on_failure]
      logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger
      if tries <= max_retries
        retry
      else
        logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger
        raise e
      end
    else
      raise e
    end

  rescue Exception => e
    logger.fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" if logger
    raise e
  end

  duration = Time.now-start if logger || tracer

  if response.status.to_i >= 300
    __log    method, path, params, body, url, response, nil, 'N/A', duration if logger
    __trace  method, path, params, body, url, response, nil, 'N/A', duration if tracer
    __log_failed response if logger
    __raise_transport_error response
  end

  json     = serializer.load(response.body) if response.headers && response.headers["content-type"] =~ /json/
  took     = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer

  __log   method, path, params, body, url, response, json, took, duration if logger
  __trace method, path, params, body, url, response, json, took, duration if tracer

  Response.new response.status, json || response.body, response.headers
ensure
  @last_request_at = Time.now
end
reload_connections!() click to toggle source

Reloads and replaces the connection collection based on cluster information.

@see Sniffer#hosts

# File lib/elasticsearch/transport/transport/base.rb, line 73
def reload_connections!
  hosts = sniffer.hosts
  __rebuild_connections :hosts => hosts, :options => options
  self
rescue SnifferTimeoutError
  logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger
  self
end
resurrect_dead_connections!() click to toggle source

Tries to “resurrect” all eligible dead connections.

@see Connections::Connection#resurrect!

# File lib/elasticsearch/transport/transport/base.rb, line 86
def resurrect_dead_connections!
  connections.dead.each { |c| c.resurrect! }
end