class LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient
Constants
- CLIENT_MUTEX
Attributes
client_options[R]
settings[R]
Public Class Methods
clear_node_client()
click to toggle source
For use in test helpers
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 32 def self.clear_node_client client_mutex_synchronize { @@client = nil } end
new(options={})
click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 17 def initialize(options={}) @logger = Cabin::Channel.get @client_options = DEFAULT_OPTIONS.merge(options) create_settings end
Public Instance Methods
build_request(action, args, source)
click to toggle source
def bulk
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 129 def build_request(action, args, source) case action when "index" request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) request.id(args[:_id]) if args[:_id] request.routing(args[:_routing]) if args[:_routing] request.source(source) when "delete" request = org.elasticsearch.action.delete.DeleteRequest.new(args[:_index]) request.id(args[:_id]) request.routing(args[:_routing]) if args[:_routing] when "create" request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) request.id(args[:_id]) if args[:_id] request.routing(args[:_routing]) if args[:_routing] request.source(source) request.opType("create") when "create_unless_exists" unless args[:_id].nil? request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) request.id(args[:_id]) request.routing(args[:_routing]) if args[:_routing] request.source(source) request.opType("create") else raise(LogStash::ConfigurationError, "Specifying action => 'create_unless_exists' without a document '_id' is not supported.") end when "update" unless args[:_id].nil? request = org.elasticsearch.action.update.UpdateRequest.new(args[:_index], args[:_type], args[:_id]) request.routing(args[:_routing]) if args[:_routing] request.doc(source) if @client_options[:doc_as_upsert] request.docAsUpsert(true) else request.upsert(args[:_upsert]) if args[:_upsert] end else raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.") end else raise(LogStash::ConfigurationError, "action => '#{action_name}' is not currently supported.") end # case action request.type(args[:_type]) if args[:_type] return request end
bulk(actions)
click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 116 def bulk(actions) # Actions an array of [ action, action_metadata, source ] prep = client.prepareBulk actions.each do |action, args, source| prep.add(build_request(action, args, source)) end response = prep.execute.actionGet() self.normalize_bulk_response(response) end
client()
click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 27 def client client_mutex_synchronize { @@client ||= make_client } end
client_mutex_synchronize() { || ... }
click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 23 def client_mutex_synchronize CLIENT_MUTEX.synchronize { yield } end
create_settings()
click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 36 def create_settings @settings = org.elasticsearch.common.settings.Settings.settingsBuilder() if @client_options[:hosts] @settings.put("discovery.zen.ping.multicast.enabled", false) @settings.put("discovery.zen.ping.unicast.hosts", hosts(@client_options)) end @settings.put("node.client", true) @settings.put("http.enabled", false) @settings.put("path.home", Dir.pwd) if @client_options[:client_settings] @client_options[:client_settings].each do |key, value| @settings.put(key, value) end end @settings end
hosts(options)
click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 56 def hosts(options) # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ result = Array.new if options[:hosts].class == Array options[:hosts].each do |host| if host.to_s =~ /^.+:.+$/ # For host in format: host:port, ignore options[:port] result << host else if options[:port].to_s =~ /^\d+-\d+$/ # port ranges are 'host[port1-port2]'b result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" } else result << "#{host}:#{options[:port]}" end end end else if options[:hosts].to_s =~ /^.+:.+$/ # For host in format: host:port, ignore options[:port] result << options[:hosts] else if options[:port].to_s =~ /^\d+-\d+$/ # port ranges are 'host[port1-port2]' according to # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ # However, it seems to only query the first port. # So generate our own list of unicast hosts to scan. range = Range.new(*options[:port].split("-")) result << range.collect { |p| "#{options[:hosts]}:#{p}" } else result << "#{options[:hosts]}:#{options[:port]}" end end end result.flatten.join(",") end
make_client()
click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 111 def make_client nodebuilder = org.elasticsearch.node.NodeBuilder.nodeBuilder nodebuilder.settings(settings.build).node().client() end
normalize_bulk_response(bulk_response)
click to toggle source
Normalizes the Java response to a reasonable approximation of the HTTP datastructure for interop with the HTTP code
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 95 def normalize_bulk_response(bulk_response) # TODO(talevy): parse item response objects to retrieve correct 200 (OK) or 201(created) status codes + items = bulk_response.map {|i| items = bulk_response.map { |i| if i.is_failed [[i.get_op_type, {"status" => i.get_failure.get_status.get_status, "message" => i.failureMessage}]] else [[i.get_op_type, {"status" => 200, "message" => "OK"}]] end } if bulk_response.has_failures() {"errors" => true, "items" => items} else {"errors" => false} end end
template_exists?(name)
click to toggle source
def build_request
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 179 def template_exists?(name) return !client.admin.indices. prepareGetTemplates(name). execute(). actionGet(). getIndexTemplates(). isEmpty end
template_install(name, template, force=false)
click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 188 def template_install(name, template, force=false) if template_exists?(name) && !force @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name) return end template_put(name, template) end
template_put(name, template)
click to toggle source
# File lib/logstash/outputs/elasticsearch_java/protocol.rb, line 196 def template_put(name, template) response = client.admin.indices. preparePutTemplate(name). setSource(LogStash::Json.dump(template)). execute(). actionGet() raise "Could not index template!" unless response.isAcknowledged end