class LogStash::Outputs::Mongodb
This output writes events to MongoDB.
Public Instance Methods
receive(event)
click to toggle source
# File lib/logstash/outputs/mongodb.rb, line 74 def receive(event) begin # Our timestamp object now has a to_bson method, using it here # {}.merge(other) so we don't taint the event hash innards document = {}.merge(event.to_hash) if !@isodate # not using timestamp.to_bson document["@timestamp"] = event.timestamp.to_json end if @generateId document["_id"] = BSON::ObjectId.new(nil, event.timestamp) end if @bulk @@mutex.synchronize do collection = event.sprintf(@collection) if(!@documents[collection]) @documents[collection] = [] end @documents[collection].push(document) if(@documents[collection].length >= @bulk_size) @db[collection].insert_many(@documents[collection]) @documents.delete(collection) end end else @db[event.sprintf(@collection)].insert_one(document) end rescue => e @logger.warn("Failed to send event to MongoDB", :event => event, :exception => e, :backtrace => e.backtrace) if e.message =~ /^E11000/ # On a duplicate key error, skip the insert. # We could check if the duplicate key err is the _id key # and generate a new primary key. # If the duplicate key error is on another field, we have no way # to fix the issue. else sleep @retry_delay retry end end end
register()
click to toggle source
# File lib/logstash/outputs/mongodb.rb, line 50 def register Mongo::Logger.logger = @logger conn = Mongo::Client.new(@uri) @db = conn.use(@database) if @bulk_size > 1000 raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'" end @documents = {} Thread.new do loop do sleep @bulk_interval @@mutex.synchronize do @documents.each do |collection, values| if values.length > 0 @db[collection].insert_many(values) @documents.delete(collection) end end end end end end