class Fluent::Plugin::MongoTailInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mongo_tail.rb, line 50 def initialize super @client_options = {} @connection_options = {} end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mongo_tail.rb, line 57 def configure(conf) super if !@tag and !@tag_key raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on mongo_tail input" end if @database && @url raise Fluent::ConfigError, "Both 'database' and 'url' can not be set" end if !@database && !@url raise Fluent::ConfigError, "One of 'database' or 'url' must be specified" end @last_id = @id_store_file ? get_last_id : nil @connection_options[:ssl] = @ssl if @batch_size && @batch_size <= 0 raise Fluent::ConfigError, "Batch size must be positive." end configure_logger(@mongo_log_level) end
run()
click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 104 def run option = {} begin option['_id'] = {'$gt' => BSON::ObjectId(@last_id)} if @last_id documents = @collection.find(option) documents = documents.limit(@batch_size) if @batch_size if documents.count >= 1 process_documents(documents) end rescue # ignore Exceptions end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mongo_tail.rb, line 93 def shutdown if @id_store_file save_last_id @file.close end @client.close super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mongo_tail.rb, line 82 def start super @file = get_id_store_file if @id_store_file @collection = get_collection # Resume tailing from last inserted id. # Because tailable option is obsoleted since mongo driver 2.0. @last_id = get_last_inserted_id if !@id_store_file and get_last_inserted_id timer_execute(:in_mongo_tail_watcher, @wait_time, &method(:run)) end
Private Instance Methods
client()
click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 120 def client @client_options[:database] = @database if @database @client_options[:user] = @user if @user @client_options[:password] = @password if @password Mongo::Client.new(node_string, @client_options) end
get_collection()
click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 127 def get_collection @client = client @client = authenticate(@client) @client["#{@collection}"] end
get_id_store_file()
click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 186 def get_id_store_file file = File.open(@id_store_file, 'w') file.sync file end
get_last_id()
click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 192 def get_last_id if File.exist?(@id_store_file) BSON::ObjectId(File.read(@id_store_file)).to_s rescue nil else nil end end
get_last_inserted_id()
click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 173 def get_last_inserted_id last_inserted_id = nil documents = @collection.find() if documents.count >= 1 documents.each {|doc| if id = doc.delete('_id') last_inserted_id = id end } end last_inserted_id end
node_string()
click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 133 def node_string case when @database ["#{@host}:#{@port}"] when @url @url end end
process_documents(documents)
click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 142 def process_documents(documents) es = Fluent::MultiEventStream.new documents.each {|doc| time = if @time_key t = doc.delete(@time_key) t.nil? ? Fluent::Engine.now : t.to_i else Fluent::Engine.now end @tag = if @tag_key t = doc.delete(@tag_key) t.nil? ? 'mongo.missing_tag' : t else @tag end if @object_id_keys @object_id_keys.each {|id_key| doc[id_key] = doc[id_key].to_s } end if id = doc.delete('_id') @last_id = id.to_s doc['_id_str'] = @last_id save_last_id if @id_store_file end es.add(time, doc) } router.emit_stream(@tag, es) end
save_last_id()
click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 200 def save_last_id @file.pos = 0 @file.write(@last_id) end