class LogStash::Outputs::Solr
Constants
- MODE_SOLRCLOUD
- MODE_STANDALONE
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/solr.rb, line 133 def close unless @zk.nil? then @zk.close end end
flush(events, close=false)
click to toggle source
# File lib/logstash/outputs/solr.rb, line 83 def flush(events, close=false) documents_per_col = {} events.each do |event| document = event.to_hash() unless document.has_key?(@unique_key) then document.merge!({@unique_key => SecureRandom.uuid}) end unless document.has_key?(@timestamp_field) then document.merge!({@timestamp_field => document['@timestamp']}) end @logger.debug 'Record: %s' % document.inspect collection = @collection if @collection_field and document.has_key?(@collection_field) then collection = document[@collection_field] document.delete(@collection_field) end documents = documents_per_col.fetch(collection, []) documents.push(document) documents_per_col[collection] = documents end params = {} if @commit params[:commit] = true end params[:commitWithin] = @commitWithin documents_per_col.each do |collection, documents| if @mode == MODE_STANDALONE then collection_url = @url.rpartition('/')[0] + '/' + collection @solr_std[collection] ||= RSolr.connect :url => collection_url @solr_std[collection].add documents, :params => params @logger.info 'Added %d document(s) to Solr at "%s"' % [documents.count, collection_url] elsif @mode == MODE_SOLRCLOUD then @solr_cloud.add documents, collection: collection, :params => params @logger.info 'Added %d document(s) to "%s" collection' % [documents.count, collection] end end rescue Exception => e @logger.warn("An error occurred while indexing", :exception => e.inspect) end
receive(event)
click to toggle source
# File lib/logstash/outputs/solr.rb, line 78 def receive(event) buffer_receive(event) end
register()
click to toggle source
# File lib/logstash/outputs/solr.rb, line 50 def register @mode = nil if ! @url.nil? then @mode = MODE_STANDALONE elsif ! @zk_host.nil? @mode = MODE_SOLRCLOUD end @solr_std = {} @solr_cloud = nil @zk = nil if @mode == MODE_STANDALONE then @solr_std[@collection] = RSolr.connect :url => @url elsif @mode == MODE_SOLRCLOUD then @zk = ZK.new(@zk_host) cloud_connection = RSolr::Cloud::Connection.new(@zk) @solr_cloud = RSolr::Client.new(cloud_connection, read_timeout: 60, open_timeout: 60) end buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) end