class Fluent::Plugin::MongoStorage
Constants
- FORMAT_COLLECTION_NAME_RE
Attributes
client_options[R]
collection_options[R]
store[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/storage_mongo.rb, line 49 def initialize super @client_options = {} @collection_options = {capped: false} @store = {} end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/storage_mongo.rb, line 57 def configure(conf) super unless @path if conf && !conf.arg.empty? @path = conf.arg else raise Fluent::ConfigError, "path or conf.arg for <storage> is required." end end if conf.has_key?('capped') and Fluent::Config.bool_value(conf['capped']) raise Fluent::ConfigError, "'capped_size' parameter is required on <storage> of Mongo storage" unless conf.has_key?('capped_size') @collection_options[:capped] = true @collection_options[:size] = Fluent::Config.size_value(conf['capped_size']) @collection_options[:max] = Fluent::Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') end @client_options[:write] = {j: @journaled} @client_options[:write].merge!({w: @write_concern}) unless @write_concern.nil? @client_options[:ssl] = @ssl if @ssl @client_options[:ssl_cert] = @ssl_cert @client_options[:ssl_key] = @ssl_key @client_options[:ssl_key_pass_phrase] = @ssl_key_pass_phrase @client_options[:ssl_verify] = @ssl_verify @client_options[:ssl_ca_cert] = @ssl_ca_cert end configure_logger(@mongo_log_level) @client = client @client = authenticate(@client) end
delete(key)
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 134 def delete(key) @store.delete(key.to_s) end
fetch(key, defval)
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 126 def fetch(key, defval) @store.fetch(key.to_s, defval) end
get(key)
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 122 def get(key) @store[key.to_s] end
load()
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 96 def load begin value = {} documents = @client[format_collection_name(@collection)].find(_id: @path) if documents.count >= 1 documents.each do |document| value.merge!(document) end end value.delete('_id') # just ignore '_id' unless value.is_a?(Hash) log.error "broken content for plugin storage (Hash required: ignored)", type: json.class log.debug "broken content", content: json_string return end @store = value rescue => e log.error "failed to load data for plugin storage from mongo", path: @path, error: e end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 92 def multi_workers_ready? true end
put(key, value)
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 130 def put(key, value) @store[key.to_s] = value end
save()
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 118 def save operate(format_collection_name(@collection), @store) end
update(key, &block)
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 138 def update(key, &block) @store[key.to_s] = block.call(@store[key.to_s]) end
Private Instance Methods
client()
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 144 def client @client_options[:database] = @database @client_options[:user] = @user if @user @client_options[:password] = @password if @password Mongo::Client.new(["#{@host}:#{@port}"], @client_options) end
format_collection_name(collection_name)
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 153 def format_collection_name(collection_name) formatted = collection_name formatted = formatted.gsub(FORMAT_COLLECTION_NAME_RE, '') formatted = @collection if formatted.size == 0 # set default for nil tag formatted end
format_key(record)
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 160 def format_key(record) if @replace_dot_in_key_with record = replace_key_of_hash(record, ".", @replace_dot_in_key_with) end if @replace_dollar_in_key_with record = replace_key_of_hash(record, /^\$/, @replace_dollar_in_key_with) end if @replace_underbar_id_in_key_with record = replace_key_of_hash(record, /^_id$/, @replace_underbar_id_in_key_with) end record end
operate(collection, record)
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 173 def operate(collection, record) begin record = format_key(record) @client[collection, @collection_options].update_one({_id: @path}, record, {upsert: true}) rescue Mongo::Error::BulkWriteError => e log.warn "document is not inserted. Maybe this document is invalid as a BSON." rescue ArgumentError => e log.warn e end record end
replace_key_of_hash(hash_or_array, pattern, replacement)
click to toggle source
# File lib/fluent/plugin/storage_mongo.rb, line 187 def replace_key_of_hash(hash_or_array, pattern, replacement) case hash_or_array when Array hash_or_array.map do |elm| replace_key_of_hash(elm, pattern, replacement) end when Hash result = Hash.new hash_or_array.each_pair do |k, v| k = k.gsub(pattern, replacement) if v.is_a?(Hash) || v.is_a?(Array) result[k] = replace_key_of_hash(v, pattern, replacement) else result[k] = v end end result else hash_or_array end end