class LogStash::Outputs::Solr

An Solr output that send data to Apache Solr.

Constants

MODE_SOLRCLOUD
MODE_STANDALONE

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/solr.rb, line 133
def close
  unless @zk.nil? then
    @zk.close
  end
end
flush(events, close=false) click to toggle source
# File lib/logstash/outputs/solr.rb, line 83
def flush(events, close=false)
  documents_per_col = {}

  events.each do |event|
    document = event.to_hash()

    unless document.has_key?(@unique_key) then
      document.merge!({@unique_key => SecureRandom.uuid})
    end
    
    unless document.has_key?(@timestamp_field) then
      document.merge!({@timestamp_field => document['@timestamp']})
    end
    
    @logger.debug 'Record: %s' % document.inspect
    
    collection = @collection
    if @collection_field and document.has_key?(@collection_field) then
      collection = document[@collection_field]
      document.delete(@collection_field)
    end
    
    documents = documents_per_col.fetch(collection, [])
    documents.push(document)
    documents_per_col[collection] = documents
  end

  params = {}
  if @commit
    params[:commit] = true
  end  
  params[:commitWithin] = @commitWithin
  
  documents_per_col.each do |collection, documents|
    if @mode == MODE_STANDALONE then
      collection_url = @url.rpartition('/')[0] + '/' + collection
      @solr_std[collection] ||= RSolr.connect :url => collection_url
      @solr_std[collection].add documents, :params => params
      @logger.info 'Added %d document(s) to Solr at "%s"' % [documents.count, collection_url]
    elsif @mode == MODE_SOLRCLOUD then
      @solr_cloud.add documents, collection: collection, :params => params
      @logger.info 'Added %d document(s) to "%s" collection' % [documents.count, collection]
    end
  end

  rescue Exception => e
    @logger.warn("An error occurred while indexing", :exception => e.inspect)
end
receive(event) click to toggle source
# File lib/logstash/outputs/solr.rb, line 78
def receive(event)
  buffer_receive(event)
end
register() click to toggle source
# File lib/logstash/outputs/solr.rb, line 50
def register
  @mode = nil
  if ! @url.nil? then
    @mode = MODE_STANDALONE
  elsif ! @zk_host.nil?
    @mode = MODE_SOLRCLOUD
  end

  @solr_std = {}
  @solr_cloud = nil
  @zk = nil

  if @mode == MODE_STANDALONE then
    @solr_std[@collection] = RSolr.connect :url => @url
  elsif @mode == MODE_SOLRCLOUD then
    @zk = ZK.new(@zk_host)
    cloud_connection = RSolr::Cloud::Connection.new(@zk)
    @solr_cloud = RSolr::Client.new(cloud_connection, read_timeout: 60, open_timeout: 60)
  end

  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )
end