module Fluent::PluginHelper::Storage

Constants

StorageState

Attributes

_storages[R]

Public Class Methods

included(mod) click to toggle source
# File lib/fluent/plugin_helper/storage.rb, line 81
def self.included(mod)
  mod.include StorageParams
end
new() click to toggle source
Calls superclass method Fluent::PluginHelper::Timer.new
# File lib/fluent/plugin_helper/storage.rb, line 87
def initialize
  super
  @_storages_started = false
  @_storages = {} # usage => storage_state
end

Public Instance Methods

after_shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/storage.rb, line 156
def after_shutdown
  storage_operate(:after_shutdown)
  super
end
before_shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/storage.rb, line 144
def before_shutdown
  storage_operate(:before_shutdown)
  super
end
close() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/storage.rb, line 161
def close
  storage_operate(:close){|s| s.running = false }
  super
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/storage.rb, line 93
def configure(conf)
  super

  @storage_configs.each do |section|
    if @_storages[section.usage]
      raise Fluent::ConfigError, "duplicated storages configured: #{section.usage}"
    end
    storage = Plugin.new_storage(section[:@type], parent: self)
    storage.configure(section.corresponding_config_element)
    @_storages[section.usage] = StorageState.new(wrap_instance(storage), false)
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/storage.rb, line 149
def shutdown
  storage_operate(:shutdown) do |s|
    s.storage.save if s.storage.save_at_shutdown
  end
  super
end
start() click to toggle source
Calls superclass method Fluent::PluginHelper::Timer#start
# File lib/fluent/plugin_helper/storage.rb, line 106
def start
  super

  @_storages_started = true
  @_storages.each_pair do |usage, s|
    s.storage.start
    s.storage.load

    if s.storage.autosave && !s.storage.persistent
      timer_execute(:storage_autosave, s.storage.autosave_interval, repeat: true) do
        begin
          s.storage.save
        rescue => e
          log.error "plugin storage failed to save its data", usage: usage, type: type, error: e
        end
      end
    end
    s.running = true
  end
end
stop() click to toggle source
Calls superclass method Fluent::PluginHelper::Timer#stop
# File lib/fluent/plugin_helper/storage.rb, line 138
def stop
  super
  # timer stops automatically in super
  storage_operate(:stop)
end
storage_create(usage: '', type: nil, conf: nil, default_type: nil) click to toggle source
# File lib/fluent/plugin_helper/storage.rb, line 32
def storage_create(usage: '', type: nil, conf: nil, default_type: nil)
  s = @_storages[usage]
  if s && s.running
    return s.storage
  elsif s
    # storage is already created, but not loaded / started
  else # !s
    type = if type
             type
           elsif conf && conf.respond_to?(:[])
             raise Fluent::ConfigError, "@type is required in <storage>" unless conf['@type']
             conf['@type']
           elsif default_type
             default_type
           else
             raise ArgumentError, "BUG: both type and conf are not specified"
           end
    storage = Plugin.new_storage(type, parent: self)
    config = case conf
             when Fluent::Config::Element
               conf
             when Hash
               # in code, programmer may use symbols as keys, but Element needs strings
               conf = Hash[conf.map{|k,v| [k.to_s, v]}]
               Fluent::Config::Element.new('storage', usage, conf, [])
             when nil
               Fluent::Config::Element.new('storage', usage, {}, [])
             else
               raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'"
             end
    storage.configure(config)
    if @_storages_started
      storage.start
    end
    s = @_storages[usage] = StorageState.new(wrap_instance(storage), false)
  end

  s.storage
end
storage_operate(method_name, &block) click to toggle source
# File lib/fluent/plugin_helper/storage.rb, line 127
def storage_operate(method_name, &block)
  @_storages.each_pair do |usage, s|
    begin
      block.call(s) if block_given?
      s.storage.send(method_name)
    rescue => e
      log.error "unexpected error while #{method_name}", usage: usage, storage: s.storage, error: e
    end
  end
end
terminate() click to toggle source
Calls superclass method Fluent::PluginHelper::Timer#terminate
# File lib/fluent/plugin_helper/storage.rb, line 166
def terminate
  storage_operate(:terminate)
  @_storages = {}
  super
end
wrap_instance(storage) click to toggle source
# File lib/fluent/plugin_helper/storage.rb, line 172
def wrap_instance(storage)
  if storage.persistent && storage.persistent_always?
    storage
  elsif storage.persistent
    PersistentWrapper.new(storage)
  elsif !storage.synchronized?
    SynchronizeWrapper.new(storage)
  else
    storage
  end
end