# File lib/fluent/plugin_helper/storage.rb, line 81 def self.included(mod) mod.include StorageParams end
# File lib/fluent/plugin_helper/storage.rb, line 87 def initialize super @_storages_started = false @_storages = {} # usage => storage_state end
# File lib/fluent/plugin_helper/storage.rb, line 156 def after_shutdown storage_operate(:after_shutdown) super end
# File lib/fluent/plugin_helper/storage.rb, line 144 def before_shutdown storage_operate(:before_shutdown) super end
# File lib/fluent/plugin_helper/storage.rb, line 161 def close storage_operate(:close){|s| s.running = false } super end
# File lib/fluent/plugin_helper/storage.rb, line 93 def configure(conf) super @storage_configs.each do |section| if @_storages[section.usage] raise Fluent::ConfigError, "duplicated storages configured: #{section.usage}" end storage = Plugin.new_storage(section[:@type], parent: self) storage.configure(section.corresponding_config_element) @_storages[section.usage] = StorageState.new(wrap_instance(storage), false) end end
# File lib/fluent/plugin_helper/storage.rb, line 149 def shutdown storage_operate(:shutdown) do |s| s.storage.save if s.storage.save_at_shutdown end super end
# File lib/fluent/plugin_helper/storage.rb, line 106 def start super @_storages_started = true @_storages.each_pair do |usage, s| s.storage.start s.storage.load if s.storage.autosave && !s.storage.persistent timer_execute(:storage_autosave, s.storage.autosave_interval, repeat: true) do begin s.storage.save rescue => e log.error "plugin storage failed to save its data", usage: usage, type: type, error: e end end end s.running = true end end
# File lib/fluent/plugin_helper/storage.rb, line 138 def stop super # timer stops automatically in super storage_operate(:stop) end
# File lib/fluent/plugin_helper/storage.rb, line 32 def storage_create(usage: '', type: nil, conf: nil, default_type: nil) s = @_storages[usage] if s && s.running return s.storage elsif s # storage is already created, but not loaded / started else # !s type = if type type elsif conf && conf.respond_to?(:[]) raise Fluent::ConfigError, "@type is required in <storage>" unless conf['@type'] conf['@type'] elsif default_type default_type else raise ArgumentError, "BUG: both type and conf are not specified" end storage = Plugin.new_storage(type, parent: self) config = case conf when Fluent::Config::Element conf when Hash # in code, programmer may use symbols as keys, but Element needs strings conf = Hash[conf.map{|k,v| [k.to_s, v]}] Fluent::Config::Element.new('storage', usage, conf, []) when nil Fluent::Config::Element.new('storage', usage, {}, []) else raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'" end storage.configure(config) if @_storages_started storage.start end s = @_storages[usage] = StorageState.new(wrap_instance(storage), false) end s.storage end
# File lib/fluent/plugin_helper/storage.rb, line 127 def storage_operate(method_name, &block) @_storages.each_pair do |usage, s| begin block.call(s) if block_given? s.storage.send(method_name) rescue => e log.error "unexpected error while #{method_name}", usage: usage, storage: s.storage, error: e end end end
# File lib/fluent/plugin_helper/storage.rb, line 166 def terminate storage_operate(:terminate) @_storages = {} super end
# File lib/fluent/plugin_helper/storage.rb, line 172 def wrap_instance(storage) if storage.persistent && storage.persistent_always? storage elsif storage.persistent PersistentWrapper.new(storage) elsif !storage.synchronized? SynchronizeWrapper.new(storage) else storage end end