class Fluent::Plugin::LevelDBStorage

Constants

DEFAULT_DIR_MODE

Attributes

leveldb[R]
store[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/storage_leveldb.rb, line 21
def initialize
  super

  @store = {}
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/storage_leveldb.rb, line 27
def configure(conf)
  super

  if @path
    if File.exist?(@path) && File.file?(@path)
      raise Fluent::ConfigError, "path for <storage> should be a directory."
    elsif File.exist?(@path) && File.directory?(@path)
      @path = File.join(@path, "worker#{fluentd_worker_id}", "storage.db")
      @multi_workers_available = true
    end
  elsif root_dir = owner.plugin_root_dir
    basename = (conf.arg && !conf.arg.empty?) ? "storage.#{conf.arg}.db" : "storage.db"
    @path = File.join(root_dir, basename)
    @multi_workers_available = true
  else
    raise Fluent::ConfigError, "path for <storage> is required."
  end

  dir = File.dirname(@path)
  FileUtils.mkdir_p(dir, mode: @dir_mode) unless Dir.exist?(dir)
  if File.exist?(@path)
    raise Fluent::ConfigError, "Plugin storage path '#{@path}' is not readable/writable" unless File.readable?(@path) && File.writable?(@path)
  else
    raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.stat(dir).writable?
  end

  @leveldb = LevelDB::DB.new(@path)
  object = @leveldb.get(@root_key)
  if object
    begin
      data = Yajl::Parser.parse(object)
      raise Fluent::ConfigError, "Invalid contents (not object) in plugin leveldb storage: '#{@path}'" unless data.is_a?(Hash)
    rescue => e
      log.error "failed to read data from plugin redis storage", path: @path, error: e
      raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin leveldb storage: '#{@path}'"
    end
  end
end
delete(key) click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 108
def delete(key)
  @store.delete(key.to_s)
end
fetch(key, defval) click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 100
def fetch(key, defval)
  @store.fetch(key.to_s, defval)
end
get(key) click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 96
def get(key)
  @store[key.to_s]
end
load() click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 70
def load
  begin
    json_string = @leveldb.get(@root_key)
    json = Yajl::Parser.parse(json_string)
    unless json.is_a?(Hash)
      log.error "broken content for plugin storage (Hash required: ignored)", type: json.class
      log.debug "broken content", content: json_string
      return
    end
    @store = json
  rescue => e
    log.error "failed to load data for plugin storage from leveldb", path: @path, error: e
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 66
def multi_workers_ready?
  @multi_workers_available
end
put(key, value) click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 104
def put(key, value)
  @store[key.to_s] = value
end
save() click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 85
def save
  begin
    json_string = Yajl::Encoder.encode(@store)
    @leveldb.batch do
      @leveldb.put(@root_key, json_string)
    end
  rescue => e
    log.error "failed to save data for plugin storage to leveldb", path: @path, error: e
  end
end
update(key, &block) click to toggle source
# File lib/fluent/plugin/storage_leveldb.rb, line 112
def update(key, &block)
  @store[key.to_s] = block.call(@store[key.to_s])
end