class LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient

Constants

CLIENT_MUTEX

Attributes

client_options[R]
settings[R]

Public Class Methods

clear_node_client() click to toggle source

For use in test helpers

# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 32
def self.clear_node_client
  client_mutex_synchronize { @@client = nil }
end
new(options={}) click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 17
def initialize(options={})
  @logger = Cabin::Channel.get
  @client_options = DEFAULT_OPTIONS.merge(options)
  create_settings
end

Public Instance Methods

build_request(action, args, source) click to toggle source

def bulk

# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 129
def build_request(action, args, source)
  case action
    when "index"
      request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
      request.id(args[:_id]) if args[:_id]
      request.routing(args[:_routing]) if args[:_routing]
      request.source(source)
    when "delete"
      request = org.elasticsearch.action.delete.DeleteRequest.new(args[:_index])
      request.id(args[:_id])
      request.routing(args[:_routing]) if args[:_routing]
    when "create"
      request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
      request.id(args[:_id]) if args[:_id]
      request.routing(args[:_routing]) if args[:_routing]
      request.source(source)
      request.opType("create")
    when "create_unless_exists"
      unless args[:_id].nil?
        request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
        request.id(args[:_id])
        request.routing(args[:_routing]) if args[:_routing]
        request.source(source)
        request.opType("create")
      else
        raise(LogStash::ConfigurationError, "Specifying action => 'create_unless_exists' without a document '_id' is not supported.")
      end
    when "update"
      unless args[:_id].nil?
        request = org.elasticsearch.action.update.UpdateRequest.new(args[:_index], args[:_type], args[:_id])
        request.routing(args[:_routing]) if args[:_routing]
        request.doc(source)
        if @client_options[:doc_as_upsert]
          request.docAsUpsert(true)
        else
          request.upsert(args[:_upsert]) if args[:_upsert]
        end
      else
        raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.")
      end
    else
      raise(LogStash::ConfigurationError, "action => '#{action_name}' is not currently supported.")
  end # case action

  request.type(args[:_type]) if args[:_type]
  return request
end
bulk(actions) click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 116
def bulk(actions)
  # Actions an array of [ action, action_metadata, source ]
  prep = client.prepareBulk
  actions.each do |action, args, source|
    prep.add(build_request(action, args, source))
  end
  response = prep.execute.actionGet()

  self.normalize_bulk_response(response)
end
client() click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 27
def client
  client_mutex_synchronize { @@client ||= make_client }
end
client_mutex_synchronize() { || ... } click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 23
def client_mutex_synchronize
  CLIENT_MUTEX.synchronize { yield }
end
create_settings() click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 36
def create_settings
  @settings = org.elasticsearch.common.settings.Settings.settingsBuilder()
  if @client_options[:hosts]
    @settings.put("discovery.zen.ping.multicast.enabled", false)
    @settings.put("discovery.zen.ping.unicast.hosts", hosts(@client_options))
  end

  @settings.put("node.client", true)
  @settings.put("http.enabled", false)
  @settings.put("path.home", Dir.pwd)

  if @client_options[:client_settings]
    @client_options[:client_settings].each do |key, value|
      @settings.put(key, value)
    end
  end

  @settings
end
hosts(options) click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 56
def hosts(options)
  # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
  result = Array.new
  if options[:hosts].class == Array
    options[:hosts].each do |host|
      if host.to_s =~ /^.+:.+$/
        # For host in format: host:port, ignore options[:port]
        result << host
      else
        if options[:port].to_s =~ /^\d+-\d+$/
          # port ranges are 'host[port1-port2]'b
          result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" }
        else
          result << "#{host}:#{options[:port]}"
        end
      end
    end
  else
    if options[:hosts].to_s =~ /^.+:.+$/
      # For host in format: host:port, ignore options[:port]
      result << options[:hosts]
    else
      if options[:port].to_s =~ /^\d+-\d+$/
        # port ranges are 'host[port1-port2]' according to
        # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
        # However, it seems to only query the first port.
        # So generate our own list of unicast hosts to scan.
        range = Range.new(*options[:port].split("-"))
        result << range.collect { |p| "#{options[:hosts]}:#{p}" }
      else
        result << "#{options[:hosts]}:#{options[:port]}"
      end
    end
  end
  result.flatten.join(",")
end
make_client() click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 111
def make_client
  nodebuilder = org.elasticsearch.node.NodeBuilder.nodeBuilder
  nodebuilder.settings(settings.build).node().client()
end
normalize_bulk_response(bulk_response) click to toggle source

Normalizes the Java response to a reasonable approximation of the HTTP datastructure for interop with the HTTP code

# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 95
def normalize_bulk_response(bulk_response)
  # TODO(talevy): parse item response objects to retrieve correct 200 (OK) or 201(created) status codes             +            items = bulk_response.map {|i|
  items = bulk_response.map { |i|
    if i.is_failed
      [[i.get_op_type, {"status" => i.get_failure.get_status.get_status, "message" => i.failureMessage}]]
    else
      [[i.get_op_type, {"status" => 200, "message" => "OK"}]]
    end
  }
  if bulk_response.has_failures()
    {"errors" => true, "items" => items}
  else
    {"errors" => false}
  end
end
template_exists?(name) click to toggle source

def build_request

# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 179
def template_exists?(name)
  return !client.admin.indices.
    prepareGetTemplates(name).
    execute().
    actionGet().
    getIndexTemplates().
    isEmpty
end
template_install(name, template, force=false) click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 188
def template_install(name, template, force=false)
  if template_exists?(name) && !force
    @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name)
    return
  end
  template_put(name, template)
end
template_put(name, template) click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 196
def template_put(name, template)
  response = client.admin.indices.
    preparePutTemplate(name).
    setSource(LogStash::Json.dump(template)).
    execute().
    actionGet()

  raise "Could not index template!" unless response.isAcknowledged
end