class Fluent::Plugin::MongoOutput

Constants

DEFAULT_BUFFER_TYPE
FORMAT_COLLECTION_NAME_RE
LIMIT_AFTER_v1_8
LIMIT_BEFORE_v1_8

Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.

Attributes

client_options[R]
collection_options[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mongo.rb, line 76
def initialize
  super

  @nodes = nil
  @client_options = {}
  @collection_options = {capped: false}
  @date_accessors = {}
  @object_id_accessors = {}
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mongo.rb, line 90
def configure(conf)
  if conf.has_key?('buffer_chunk_limit')
    configured_chunk_limit_size = Fluent::Config.size_value(conf['buffer_chunk_limit'])
    estimated_limit_size = LIMIT_AFTER_v1_8
    estimated_limit_size_conf = '8m'
    if conf.has_key?('mongodb_smaller_bson_limit') && Fluent::Config.bool_value(conf['mongodb_smaller_bson_limit'])
      estimated_limit_size = LIMIT_BEFORE_v1_8
      estimated_limit_size_conf = '2m'
    end
    if configured_chunk_limit_size > estimated_limit_size
      log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{estimated_limit_size_conf}"
      conf['buffer_chunk_limit'] = estimated_limit_size_conf
    end
  else
    if conf.has_key?('mongodb_smaller_bson_limit') && Fluent::Config.bool_value(conf['mongodb_smaller_bson_limit'])
      conf['buffer_chunk_limit'] = '2m'
    else
      conf['buffer_chunk_limit'] = '8m'
    end
  end
  # 'config_set_default :include_time_key, true' is ignored in compat_parameters_convert so need manual setting
  if conf.elements('inject').empty?
    if conf.has_key?('include_time_key')
      if Fluent::Config.bool_value(conf['include_time_key']) && !conf.has_key?('time_key')
        conf['time_key'] = 'time'
      end
    else
      conf['time_key'] = 'time'
    end
  end

  compat_parameters_convert(conf, :inject)

  super

  if @auth_mech && !Mongo::Auth::SOURCES.has_key?(@auth_mech.to_sym)
    raise Fluent::ConfigError, Mongo::Auth::InvalidMechanism.new(@auth_mech.to_sym)
  end

  if @connection_string.nil? && @database.nil?
    raise Fluent::ConfigError,  "connection_string or database parameter is required"
  end

  if conf.has_key?('tag_mapped')
    log.warn "'tag_mapped' feature is replaced with built-in config placeholder. Please consider to use 'collection ${tag}'."
    @collection = '${tag}'
  end
  raise Fluent::ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection')

  if conf.has_key?('capped')
    raise Fluent::ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size')
    @collection_options[:capped] = true
    @collection_options[:size] = Fluent::Config.size_value(conf['capped_size'])
    @collection_options[:max] = Fluent::Config.size_value(conf['capped_max']) if conf.has_key?('capped_max')
  end

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @client_options[:write] = {j: @journaled}
  @client_options[:write].merge!({w: @write_concern}) unless @write_concern.nil?
  @client_options[:ssl] = @ssl

  if @ssl
    @client_options[:ssl_cert] = @ssl_cert
    @client_options[:ssl_key] = @ssl_key
    @client_options[:ssl_key_pass_phrase] = @ssl_key_pass_phrase
    @client_options[:ssl_verify] = @ssl_verify
    @client_options[:ssl_ca_cert] = @ssl_ca_cert
  end
  @nodes = ["#{@host}:#{@port}"] if @nodes.nil?

  configure_logger(@mongo_log_level)

  log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}"

  if @date_keys
    @date_keys.each { |field_name|
      @date_accessors[field_name.to_s] = record_accessor_create(field_name)
    }
    log.debug "Setup record accessor for every date key"
  end
  if @object_id_keys
    @object_id_keys.each { |field_name|
      @object_id_accessors[field_name.to_s] = record_accessor_create(field_name)
    }
    log.debug "Setup record accessor for every object_id key"
  end
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 193
def formatted_to_msgpack_binary
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 197
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mongo.rb, line 188
def shutdown
  @client.close
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mongo.rb, line 181
def start
  @client = client
  @client = authenticate(@client)
  @collections = {}
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 201
def write(chunk)
  collection_name = extract_placeholders(@collection, chunk)
  # In connection_string case, we shouldn't handle extract_placeholers for @database.
  database_name = extract_placeholders(@database, chunk) unless @connection_string
  operate(database_name, format_collection_name(collection_name), collect_records(chunk))
end

Private Instance Methods

client(database = @database) click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 210
def client(database = @database)
  if @connection_string
    Mongo::Client.new(@connection_string)
  else
    @client_options[:database] = database
    @client_options[:user] = @user if @user
    @client_options[:password] = @password if @password
    Mongo::Client.new(@nodes, @client_options)
  end
end
collect_records(chunk) click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 221
def collect_records(chunk)
  records = []
  time_key = @inject_config.time_key if @inject_config
  date_keys = @date_keys
  object_id_keys = @object_id_keys

  tag = chunk.metadata.tag
  chunk.msgpack_each {|time, record|
    record = inject_values_to_record(tag, time, record)
    # MongoDB uses BSON's Date for time.
    record[time_key] = Time.at(time || record[time_key]) if time_key

    if date_keys
      @date_accessors.each_pair { |date_key, date_key_accessor|
        begin
          date_value = date_key_accessor.call(record)
          case date_value
          when Fluent::EventTime
            value_to_set = date_value.to_time
          when Integer
            value_to_set = if date_value > 9999999999
                             # epoch with milliseconds: e.g. javascript
                             Time.at(date_value / 1000.0)
                           else
                             # epoch with seconds: e.g. ruby
                             Time.at(date_value)
                           end
          when Float
            value_to_set = Time.at(date_value)
          else
            if @parse_string_number_date
              if date_value.to_i.to_s == date_value
                date_value = date_value.to_i
                value_to_set = if date_value > 9999999999
                                 # epoch with milliseconds: e.g. javascript
                                 date_value / 1000.0
                               else
                                 # epoch with seconds: e.g. ruby
                                 date_value
                               end
              elsif date_value.to_f.to_s == date_value
                date_value = date_value.to_f
              end
              value_to_set = date_value.is_a?(String) ? Time.parse(date_value) : Time.at(date_value)
            else
              value_to_set = Time.parse(date_value)
            end
          end

          date_key_accessor.set(record, value_to_set)
        rescue ArgumentError
          log.warn "Failed to parse '#{date_key}' field. Expected date types are Integer/Float/String/EventTime: #{date_value}"
          date_key_accessor.set(record, nil)
        end
      }
    end
    if object_id_keys
      @object_id_accessors.each_pair { |object_id_key, object_id_key_accessor|
        begin
          object_id_value = object_id_key_accessor.call(record)
          value_to_set = BSON::ObjectId(object_id_value)
          object_id_key_accessor.set(record, value_to_set)
        rescue BSON::ObjectId::Invalid
          log.warn "Failed to parse '#{object_id_key}' field. Expected object_id types are String: #{object_id_value}"
          object_id_key_accessor.set(record, nil)
        end
      }
    end
    records << record
  }
  records
end
collection_exists?(name) click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 308
def collection_exists?(name)
  if list_collections_enabled?
    r = @client.database.command(
      { :listCollections => 1, :filter => { :name => name } }
    ).first
    r[:ok] && r[:cursor][:firstBatch].size == 1
  else
    @client.database.collection_names.include?(name)
  end
end
forget_collection(name) click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 338
def forget_collection(name)
  @collections.delete(name)
end
format_collection_name(collection_name) click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 296
def format_collection_name(collection_name)
  formatted = collection_name
  formatted = formatted.gsub(@remove_tag_prefix, '') if @remove_tag_prefix
  formatted = formatted.gsub(FORMAT_COLLECTION_NAME_RE, '')
  formatted = @collection if formatted.size == 0 # set default for nil tag
  formatted
end
get_collection(database, name, options) click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 319
def get_collection(database, name,  options)
  @client = client(database) if database && @database != database
  return @client[name] if @collections[name]

  unless collection_exists?(name)
    log.trace "Create collection #{name} with options #{options}"
    @client[name, options].create
    if @expire_after > 0 && @inject_config
      log.trace "Create expiring index with key: \"#{@inject_config.time_key}\" and seconds: \"#{@expire_after}\""
      @client[name].indexes.create_one(
        {"#{@inject_config.time_key}": 1},
        expire_after: @expire_after
      )
    end
  end
  @collections[name] = true
  @client[name]
end
list_collections_enabled?() click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 304
def list_collections_enabled?
  @client.cluster.next_primary(false).features.list_collections_enabled?
end
operate(database, collection, records) click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 342
def operate(database, collection, records)
  begin
    if @replace_dot_in_key_with
      records.map! do |r|
        replace_key_of_hash(r, ".", @replace_dot_in_key_with)
      end
    end
    if @replace_dollar_in_key_with
      records.map! do |r|
        replace_key_of_hash(r, /^\$/, @replace_dollar_in_key_with)
      end
    end

    get_collection(database, collection, @collection_options).insert_many(records)
  rescue Mongo::Error::BulkWriteError => e
    log.warn "#{records.size - e.result["n_inserted"]} documents are not inserted. Maybe these documents are invalid as a BSON."
    forget_collection(collection)
  rescue ArgumentError => e
    log.warn e
  end
  records
end
replace_key_of_hash(hash_or_array, pattern, replacement) click to toggle source
# File lib/fluent/plugin/out_mongo.rb, line 365
def replace_key_of_hash(hash_or_array, pattern, replacement)
  case hash_or_array
  when Array
    hash_or_array.map do |elm|
      replace_key_of_hash(elm, pattern, replacement)
    end
  when Hash
    result = Hash.new
    hash_or_array.each_pair do |k, v|
      k = k.gsub(pattern, replacement)

      if v.is_a?(Hash) || v.is_a?(Array)
        result[k] = replace_key_of_hash(v, pattern, replacement)
      else
        result[k] = v
      end
    end
    result
  else
    hash_or_array
  end
end