module Elasticsearch::Transport::Transport::Base

@abstract Module with common functionality for transport implementations.

Constants

DEFAULT_MAX_RETRIES
DEFAULT_PORT
DEFAULT_PROTOCOL
DEFAULT_RELOAD_AFTER
DEFAULT_RESURRECT_AFTER
DEFAULT_SERIALIZER_CLASS
SANITIZED_PASSWORD

Attributes

connections[R]
counter[R]
hosts[R]
last_request_at[R]
logger[RW]
max_retries[RW]
options[R]
protocol[R]
reload_after[RW]
reload_connections[RW]
resurrect_after[RW]
serializer[RW]
sniffer[RW]
tracer[RW]

Public Class Methods

new(arguments={}, &block) click to toggle source

Creates a new transport object

@param arguments [Hash] Settings and options for the transport @param block [Proc] Lambda or Proc which can be evaluated in the context of the “session” object

@option arguments [Array] :hosts An Array of normalized hosts information @option arguments [Array] :options A Hash with options (usually passed by {Client})

@see Client#initialize

# File lib/elasticsearch/transport/transport/base.rb, line 31
def initialize(arguments={}, &block)
  @state_mutex = Mutex.new

  @hosts       = arguments[:hosts]   || []
  @options     = arguments[:options] || {}
  @options[:http] ||= {}
  @options[:retry_on_status] ||= []

  @block       = block
  @connections = __build_connections

  @serializer  = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
  @protocol    = options[:protocol] || DEFAULT_PROTOCOL

  @logger      = options[:logger]
  @tracer      = options[:tracer]

  @sniffer     = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
  @counter     = 0
  @counter_mtx = Mutex.new
  @last_request_at = Time.now
  @reload_connections = options[:reload_connections]
  @reload_after    = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
  @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
  @max_retries     = options[:retry_on_failure].is_a?(Integer)   ? options[:retry_on_failure]   : DEFAULT_MAX_RETRIES
  @retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
end

Public Instance Methods

__build_connection(host, options={}, block=nil) click to toggle source

@abstract Build and return a connection.

A transport implementation *must* implement this method.
See {HTTP::Faraday#__build_connection} for an example.

@return [Connections::Connection] @api private

# File lib/elasticsearch/transport/transport/base.rb, line 151
def __build_connection(host, options={}, block=nil)
  raise NoMethodError, "Implement this method in your class"
end
__build_connections() click to toggle source

Builds and returns a collection of connections

The adapters have to implement the {Base#__build_connection} method.

@return [Connections::Collection] @api private

# File lib/elasticsearch/transport/transport/base.rb, line 128
def __build_connections
  Connections::Collection.new \
    :connections => hosts.map { |host|
      host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
      host[:port] ||= options[:port] || options[:http][:scheme] || DEFAULT_PORT
      if (options[:user] || options[:http][:user]) && !host[:user]
        host[:user] ||= options[:user] || options[:http][:user]
        host[:password] ||= options[:password] || options[:http][:password]
      end

      __build_connection(host, (options[:transport_options] || {}), @block)
    },
    :selector_class => options[:selector_class],
    :selector => options[:selector]
end
__close_connections() click to toggle source

Closes the connections collection

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 159
def __close_connections
  # A hook point for specific adapters when they need to close connections
end
__convert_to_json(o=nil, options={}) click to toggle source

Converts any non-String object to JSON

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 209
def __convert_to_json(o=nil, options={})
  o = o.is_a?(String) ? o : serializer.dump(o, options)
end
__full_url(host) click to toggle source

Returns a full URL based on information from host

@param host [Hash] Host configuration passed in from {Client}

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 218
def __full_url(host)
  url  = "#{host[:protocol]}://"
  url += "#{CGI.escape(host[:user])}:#{CGI.escape(host[:password])}@" if host[:user]
  url += "#{host[:host]}:#{host[:port]}"
  url += "#{host[:path]}" if host[:path]
  url
end
__log(method, path, params, body, url, response, json, took, duration) click to toggle source

Log request and response information

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 167
def __log(method, path, params, body, url, response, json, took, duration)
  sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD +  '@')
  logger.info  "#{method.to_s.upcase} #{sanitized_url} " +
               "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
  logger.debug "> #{__convert_to_json(body)}" if body
  logger.debug "< #{response.body}"
end
__log_failed(response) click to toggle source

Log failed request

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 179
def __log_failed(response)
  logger.fatal "[#{response.status}] #{response.body}"
end
__raise_transport_error(response) click to toggle source

Raise error specific for the HTTP response status or a generic server error

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 200
def __raise_transport_error(response)
  error = ERRORS[response.status] || ServerError
  raise error.new "[#{response.status}] #{response.body}"
end
__rebuild_connections(arguments={}) click to toggle source

Rebuilds the connections collection in the transport.

The methods adds new connections from the passed hosts to the collection, and removes all connections not contained in the passed hosts.

@return [Connections::Collection] @api private

# File lib/elasticsearch/transport/transport/base.rb, line 104
def __rebuild_connections(arguments={})
  @state_mutex.synchronize do
    @hosts       = arguments[:hosts]    || []
    @options     = arguments[:options]  || {}

    __close_connections

    new_connections = __build_connections
    stale_connections = @connections.all.select  { |c| ! new_connections.include?(c) }
    new_connections = new_connections.reject { |c| @connections.include?(c) }

    @connections.remove(stale_connections)
    @connections.add(new_connections)
    @connections
  end
end
__trace(method, path, params, body, url, response, json, took, duration) click to toggle source

Trace the request in the ‘curl` format

@api private

# File lib/elasticsearch/transport/transport/base.rb, line 187
def __trace(method, path, params, body, url, response, json, took, duration)
  trace_url  = "http://localhost:9200/#{path}?pretty" +
               ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
  trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
  tracer.info  "curl -X #{method.to_s.upcase} '#{trace_url}'#{trace_body}\n"
  tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#"
  tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n"
end
get_connection(options={}) click to toggle source

Returns a connection from the connection pool by delegating to {Connections::Collection#get_connection}.

Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.

@return [Connections::Connection] @see Connections::Collection#get_connection

# File lib/elasticsearch/transport/transport/base.rb, line 67
def get_connection(options={})
  resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after

  @counter_mtx.synchronize { @counter += 1 }
  reload_connections!         if reload_connections && counter % reload_after == 0
  connections.get_connection(options)
end
host_unreachable_exceptions() click to toggle source

@abstract Returns an Array of connection errors specific to the transport implementation.

See {HTTP::Faraday#host_unreachable_exceptions} for an example.

@return [Array]

# File lib/elasticsearch/transport/transport/base.rb, line 338
def host_unreachable_exceptions
  [Errno::ECONNREFUSED]
end
perform_request(method, path, params={}, body=nil, &block) click to toggle source

Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.

@abstract The transport implementation has to implement this method either in full,

or by invoking this method with a block. See {HTTP::Faraday#perform_request} for an example.

@param method [String] Request method @param path [String] The API endpoint @param params [Hash] Request parameters (will be serialized by {Connections::Connection#full_url}) @param body [Hash] Request body (will be serialized by the {#serializer}) @param block [Proc] Code block to evaluate, passed from the implementation

@return [Response] @raise [NoMethodError] If no block is passed @raise [ServerError] If request failed on server @raise [Error] If no connection is available

# File lib/elasticsearch/transport/transport/base.rb, line 243
def perform_request(method, path, params={}, body=nil, &block)
  raise NoMethodError, "Implement this method in your transport class" unless block_given?
  start = Time.now if logger || tracer
  tries = 0

  params = params.clone

  ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i }

  begin
    tries     += 1
    connection = get_connection or raise Error.new("Cannot get new connection from pool.")

    if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
      params = connection.connection.params.merge(params.to_hash)
    end

    url        = connection.full_url(path, params)

    response   = block.call(connection, url)

    connection.healthy! if connection.failures > 0

    # Raise an exception so we can catch it for `retry_on_status`
    __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)

  rescue Elasticsearch::Transport::Transport::ServerError => e
    if @retry_on_status.include?(response.status)
      logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
      if tries <= max_retries
        retry
      else
        logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger
        raise e
      end
    else
      raise e
    end

  rescue *host_unreachable_exceptions => e
    logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger

    connection.dead!

    if @options[:reload_on_failure] and tries < connections.all.size
      logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger
      reload_connections! and retry
    end

    if @options[:retry_on_failure]
      logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger
      if tries <= max_retries
        retry
      else
        logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger
        raise e
      end
    else
      raise e
    end

  rescue Exception => e
    logger.fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" if logger
    raise e

  end #/begin

  duration = Time.now-start if logger || tracer

  if response.status.to_i >= 300
    __log    method, path, params, body, url, response, nil, 'N/A', duration if logger
    __trace  method, path, params, body, url, response, nil, 'N/A', duration if tracer

    # Log the failure only when `ignore` doesn't match the response status
    __log_failed response if logger && !ignore.include?(response.status.to_i)

    __raise_transport_error response unless ignore.include?(response.status.to_i)
  end

  json     = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/
  took     = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer

  __log   method, path, params, body, url, response, json, took, duration if logger && !ignore.include?(response.status.to_i)
  __trace method, path, params, body, url, response, json, took, duration if tracer

  Response.new response.status, json || response.body, response.headers
ensure
  @last_request_at = Time.now
end
reload_connections!() click to toggle source

Reloads and replaces the connection collection based on cluster information

@see Sniffer#hosts

# File lib/elasticsearch/transport/transport/base.rb, line 79
def reload_connections!
  hosts = sniffer.hosts
  __rebuild_connections :hosts => hosts, :options => options
  self
rescue SnifferTimeoutError
  logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger
  self
end
resurrect_dead_connections!() click to toggle source

Tries to “resurrect” all eligible dead connections

@see Connections::Connection#resurrect!

# File lib/elasticsearch/transport/transport/base.rb, line 92
def resurrect_dead_connections!
  connections.dead.each { |c| c.resurrect! }
end