class Fluent::Plugin::MongoTailInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mongo_tail.rb, line 50
def initialize
  super

  @client_options = {}
  @connection_options = {}
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mongo_tail.rb, line 57
def configure(conf)
  super

  if !@tag and !@tag_key
    raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on mongo_tail input"
  end

  if @database && @url
    raise Fluent::ConfigError, "Both 'database' and 'url' can not be set"
  end

  if !@database && !@url
    raise Fluent::ConfigError, "One of 'database' or 'url' must be specified"
  end

  @last_id = @id_store_file ? get_last_id : nil
  @connection_options[:ssl] = @ssl

  if @batch_size && @batch_size <= 0
    raise Fluent::ConfigError, "Batch size must be positive."
  end

  configure_logger(@mongo_log_level)
end
run() click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 104
def run
  option = {}
  begin
    option['_id'] = {'$gt' => BSON::ObjectId(@last_id)} if @last_id
    documents = @collection.find(option)
    documents = documents.limit(@batch_size) if @batch_size
    if documents.count >= 1
      process_documents(documents)
    end
  rescue
    # ignore Exceptions
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mongo_tail.rb, line 93
def shutdown
  if @id_store_file
    save_last_id
    @file.close
  end

  @client.close

  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mongo_tail.rb, line 82
def start
  super

  @file = get_id_store_file if @id_store_file
  @collection = get_collection
  # Resume tailing from last inserted id.
  # Because tailable option is obsoleted since mongo driver 2.0.
  @last_id = get_last_inserted_id if !@id_store_file and get_last_inserted_id
  timer_execute(:in_mongo_tail_watcher, @wait_time, &method(:run))
end

Private Instance Methods

client() click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 120
def client
  @client_options[:database] = @database if @database
  @client_options[:user] = @user if @user
  @client_options[:password] = @password if @password
  Mongo::Client.new(node_string, @client_options)
end
get_collection() click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 127
def get_collection
  @client = client
  @client = authenticate(@client)
  @client["#{@collection}"]
end
get_id_store_file() click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 186
def get_id_store_file
  file = File.open(@id_store_file, 'w')
  file.sync
  file
end
get_last_id() click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 192
def get_last_id
  if File.exist?(@id_store_file)
    BSON::ObjectId(File.read(@id_store_file)).to_s rescue nil
  else
    nil
  end
end
get_last_inserted_id() click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 173
def get_last_inserted_id
  last_inserted_id = nil
  documents = @collection.find()
  if documents.count >= 1
    documents.each {|doc|
      if id = doc.delete('_id')
        last_inserted_id = id
      end
    }
  end
  last_inserted_id
end
node_string() click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 133
def node_string
  case
  when @database
    ["#{@host}:#{@port}"]
  when @url
    @url
  end
end
process_documents(documents) click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 142
def process_documents(documents)
  es = Fluent::MultiEventStream.new
  documents.each {|doc|
    time = if @time_key
             t = doc.delete(@time_key)
             t.nil? ? Fluent::Engine.now : t.to_i
           else
             Fluent::Engine.now
           end
    @tag = if @tag_key
            t = doc.delete(@tag_key)
            t.nil? ? 'mongo.missing_tag' : t
          else
            @tag
          end
    if @object_id_keys
      @object_id_keys.each {|id_key|
        doc[id_key] = doc[id_key].to_s
      }
    end

    if id = doc.delete('_id')
      @last_id = id.to_s
      doc['_id_str'] = @last_id
      save_last_id if @id_store_file
    end
    es.add(time, doc)
  }
  router.emit_stream(@tag, es)
end
save_last_id() click to toggle source
# File lib/fluent/plugin/in_mongo_tail.rb, line 200
def save_last_id
  @file.pos = 0
  @file.write(@last_id)
end