class LogStash::Outputs::Mongodb

This output writes events to MongoDB.

Public Instance Methods

receive(event) click to toggle source
# File lib/logstash/outputs/mongodb.rb, line 74
def receive(event)
  begin
    # Our timestamp object now has a to_bson method, using it here
    # {}.merge(other) so we don't taint the event hash innards
    document = {}.merge(event.to_hash)
    if !@isodate
      # not using timestamp.to_bson
      document["@timestamp"] = event.timestamp.to_json
    end
    if @generateId
      document["_id"] = BSON::ObjectId.new(nil, event.timestamp)
    end
    if @bulk
      @@mutex.synchronize do
        collection = event.sprintf(@collection)
        if(!@documents[collection])
          @documents[collection] = []
        end
        @documents[collection].push(document)

        if(@documents[collection].length >= @bulk_size)
          @db[collection].insert_many(@documents[collection])
          @documents.delete(collection)
        end
      end
    else
      @db[event.sprintf(@collection)].insert_one(document)
    end

  rescue => e
    @logger.warn("Failed to send event to MongoDB", :event => event, :exception => e,
                 :backtrace => e.backtrace)
    if e.message =~ /^E11000/
        # On a duplicate key error, skip the insert.
        # We could check if the duplicate key err is the _id key
        # and generate a new primary key.
        # If the duplicate key error is on another field, we have no way
        # to fix the issue.
    else
      sleep @retry_delay
      retry
    end
  end
end
register() click to toggle source
# File lib/logstash/outputs/mongodb.rb, line 50
def register
  Mongo::Logger.logger = @logger
  conn = Mongo::Client.new(@uri)
  @db = conn.use(@database)

  if @bulk_size > 1000
    raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'"
  end
  @documents = {}
  Thread.new do
    loop do
      sleep @bulk_interval
      @@mutex.synchronize do
        @documents.each do |collection, values|
          if values.length > 0
            @db[collection].insert_many(values)
            @documents.delete(collection)
          end
        end
      end
    end
  end
end