class Fluent::Plugin::MongoStorage

Constants

FORMAT_COLLECTION_NAME_RE

Attributes

client_options[R]
collection_options[R]
store[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/storage_mongo.rb, line 49
def initialize
  super

  @client_options = {}
  @collection_options = {capped: false}
  @store = {}
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/storage_mongo.rb, line 57
def configure(conf)
  super

  unless @path
    if conf && !conf.arg.empty?
      @path = conf.arg
    else
      raise Fluent::ConfigError, "path or conf.arg for <storage> is required."
    end
  end

  if conf.has_key?('capped') and Fluent::Config.bool_value(conf['capped'])
    raise Fluent::ConfigError, "'capped_size' parameter is required on <storage> of Mongo storage" unless conf.has_key?('capped_size')
    @collection_options[:capped] = true
    @collection_options[:size] = Fluent::Config.size_value(conf['capped_size'])
    @collection_options[:max] = Fluent::Config.size_value(conf['capped_max']) if conf.has_key?('capped_max')
  end

  @client_options[:write] = {j: @journaled}
  @client_options[:write].merge!({w: @write_concern}) unless @write_concern.nil?
  @client_options[:ssl] = @ssl

  if @ssl
    @client_options[:ssl_cert] = @ssl_cert
    @client_options[:ssl_key] = @ssl_key
    @client_options[:ssl_key_pass_phrase] = @ssl_key_pass_phrase
    @client_options[:ssl_verify] = @ssl_verify
    @client_options[:ssl_ca_cert] = @ssl_ca_cert
  end
  configure_logger(@mongo_log_level)

  @client = client
  @client = authenticate(@client)
end
delete(key) click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 134
def delete(key)
  @store.delete(key.to_s)
end
fetch(key, defval) click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 126
def fetch(key, defval)
  @store.fetch(key.to_s, defval)
end
get(key) click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 122
def get(key)
  @store[key.to_s]
end
load() click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 96
def load
  begin
    value = {}
    documents = @client[format_collection_name(@collection)].find(_id: @path)
    if documents.count >= 1
      documents.each do |document|
        value.merge!(document)
      end
    end
    value.delete('_id') # just ignore '_id'

    unless value.is_a?(Hash)
      log.error "broken content for plugin storage (Hash required: ignored)", type: json.class
      log.debug "broken content", content: json_string
      return
    end
    @store = value
  rescue => e
    log.error "failed to load data for plugin storage from mongo", path: @path, error: e
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 92
def multi_workers_ready?
  true
end
put(key, value) click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 130
def put(key, value)
  @store[key.to_s] = value
end
save() click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 118
def save
  operate(format_collection_name(@collection), @store)
end
update(key, &block) click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 138
def update(key, &block)
  @store[key.to_s] = block.call(@store[key.to_s])
end

Private Instance Methods

client() click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 144
def client
  @client_options[:database] = @database
  @client_options[:user] = @user if @user
  @client_options[:password] = @password if @password
  Mongo::Client.new(["#{@host}:#{@port}"], @client_options)
end
format_collection_name(collection_name) click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 153
def format_collection_name(collection_name)
  formatted = collection_name
  formatted = formatted.gsub(FORMAT_COLLECTION_NAME_RE, '')
  formatted = @collection if formatted.size == 0 # set default for nil tag
  formatted
end
format_key(record) click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 160
def format_key(record)
  if @replace_dot_in_key_with
    record = replace_key_of_hash(record, ".", @replace_dot_in_key_with)
  end
  if @replace_dollar_in_key_with
    record = replace_key_of_hash(record, /^\$/, @replace_dollar_in_key_with)
  end
  if @replace_underbar_id_in_key_with
    record = replace_key_of_hash(record, /^_id$/, @replace_underbar_id_in_key_with)
  end
  record
end
operate(collection, record) click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 173
def operate(collection, record)
  begin
    record = format_key(record)

    @client[collection, @collection_options].update_one({_id: @path}, record, {upsert: true})
  rescue Mongo::Error::BulkWriteError => e
    log.warn "document is not inserted. Maybe this document is invalid as a BSON."
  rescue ArgumentError => e
    log.warn e
  end
  record
end
replace_key_of_hash(hash_or_array, pattern, replacement) click to toggle source

copied from github.com/fluent/fluent-plugin-mongo/blob/c989ae01d21513c8d45b5338431586542aa93b0d/lib/fluent/plugin/out_mongo.rb#L223-L244

# File lib/fluent/plugin/storage_mongo.rb, line 187
def replace_key_of_hash(hash_or_array, pattern, replacement)
  case hash_or_array
  when Array
    hash_or_array.map do |elm|
      replace_key_of_hash(elm, pattern, replacement)
    end
  when Hash
    result = Hash.new
    hash_or_array.each_pair do |k, v|
      k = k.gsub(pattern, replacement)

      if v.is_a?(Hash) || v.is_a?(Array)
        result[k] = replace_key_of_hash(v, pattern, replacement)
      else
        result[k] = v
      end
    end
    result
  else
    hash_or_array
  end
end