class LogStash::Outputs::AES::HttpClient

Constants

DEFAULT_OPTIONS

Attributes

client[R]
client_options[R]
options[R]

Public Class Methods

new(options={}) click to toggle source
# File lib/logstash/outputs/amazon_es/http_client.rb, line 15
def initialize(options={})
  @logger = Cabin::Channel.get
  @options = DEFAULT_OPTIONS.merge(options)
  @client = build_client(@options)
end

Private Class Methods

normalize_bulk_response(bulk_response) click to toggle source
# File lib/logstash/outputs/amazon_es/http_client.rb, line 90
def self.normalize_bulk_response(bulk_response)
  if bulk_response["errors"]
    # The structure of the response from the REST Bulk API is follows:
    # {"took"=>74, "errors"=>true, "items"=>[{"create"=>{"_index"=>"logstash-2014.11.17",
    #                                                    "_type"=>"logs",
    #                                                    "_id"=>"AUxTS2C55Jrgi-hC6rQF",
    #                                                    "_version"=>1,
    #                                                    "status"=>400,
    #                                                    "error"=>"MapperParsingException[failed to parse]..."}}]}
    # where each `item` is a hash of {OPTYPE => Hash[]}. calling first, will retrieve
    # this hash as a single array with two elements, where the value is the second element (i.first[1])
    # then the status of that item is retrieved.
    {"errors" => true, "statuses" => bulk_response["items"].map { |i| i.first[1]['status'] }}
  else
    {"errors" => false}
  end
end

Public Instance Methods

bulk(actions) click to toggle source
# File lib/logstash/outputs/amazon_es/http_client.rb, line 29
def bulk(actions)
  bulk_body = actions.collect do |action, args, source|
    if action == 'update'
      if args[:_id]
        source = { 'doc' => source }
        if @options[:doc_as_upsert]
          source['doc_as_upsert'] = true
        else
          source['upsert'] = args[:_upsert] if args[:_upsert]
        end
      else
        raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.")
      end
    end

    args.delete(:_upsert)

    if source && action != 'delete'
      next [ { action => args }, source ]
    else
      next { action => args }
    end
  end.flatten

  bulk_response = @client.bulk(:body => bulk_body)

  self.class.normalize_bulk_response(bulk_response)
end
template_install(name, template, force=false) click to toggle source
# File lib/logstash/outputs/amazon_es/http_client.rb, line 21
def template_install(name, template, force=false)
  if template_exists?(name) && !force
    @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name)
    return
  end
  template_put(name, template)
end

Private Instance Methods

build_client(options) click to toggle source
# File lib/logstash/outputs/amazon_es/http_client.rb, line 59
def build_client(options)
  hosts = options[:hosts]
  port = options[:port]
  protocol = options[:protocol]
  client_settings = options[:client_settings] || {}

  uris = hosts.map do |host|
    "#{protocol}://#{host}:#{port}#{client_settings[:path]}".gsub(/[\/]+$/,'')
  end

  @client_options = {
    :hosts => uris,
    :region => options[:region],
    :transport_options => {
      :request => {:open_timeout => 0, :timeout => 60},  # ELB timeouts are set at 60
      :proxy => client_settings[:proxy],
    },
    :transport_class => Elasticsearch::Transport::Transport::HTTP::AWS
  }
  internal_options = @client_options.clone
  internal_options[:aws_access_key_id] = options[:aws_access_key_id]
  internal_options[:aws_secret_access_key] = options[:aws_secret_access_key]

  if options[:user] && options[:password] then
    token = Base64.strict_encode64(options[:user] + ":" + options[:password])
    internal_options[:headers] = { "Authorization" => "Basic #{token}" }
  end

  Elasticsearch::Client.new(internal_options)
end
template_exists?(name) click to toggle source
# File lib/logstash/outputs/amazon_es/http_client.rb, line 108
def template_exists?(name)
  @client.indices.get_template(:name => name)
  return true
rescue Elasticsearch::Transport::Transport::Errors::NotFound
  return false
end
template_put(name, template) click to toggle source
# File lib/logstash/outputs/amazon_es/http_client.rb, line 115
def template_put(name, template)
  @client.indices.put_template(:name => name, :body => template)
end