class Fluent::ConfigReloaderOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_config_reloader.rb, line 40
def initialize
  super

  @q = Queue.new
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_config_reloader.rb, line 50
def configure(conf)
  super
 
  load_config_file
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 74
def emit(tag, es, chain)
  param = OpenStruct.new
  param.tag = tag
  param.es = es
  param.chain = chain
 
  @q.push param
end
outputs() click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 46
def outputs
  [@output]
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 66
def shutdown
  @watcher.delete_observers
  Thread.kill(@thread) 
  output_shutdown
rescue
  $log.warn "raises exception: #{$!.class}, '#{$!.message}"
end
start() click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 56
def start
  output_start
  @thread = Thread.new(&method(:run))
  
  @watcher = ReloadFileWatcher.create(self, @reload_file, @reload_file_watch_interval)
  
rescue
  $log.warn "raises exception: #{$!.class}, '#{$!.message}"
end
update() click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 83
def update
  $log.warn 'config_reloader: reload config file start'
  output_shutdown
  load_config_file
  output_start
  $log.warn 'config_reloader: reload config file end'
end

Private Instance Methods

load_config_file() click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 101
def load_config_file
  path = File.expand_path(@config_file)

  store_elements = File.open(path) do |io|
    if File.extname(path) == '.rb'
      require 'fluent/config/dsl'
      Config::DSL::Parser.parse(io, path)
    else
      Config.parse(io, File.basename(path), File.dirname(path), false)
    end
  end.elements.select {|e| e.name == 'store'}

  raise ConfigError, "Multiple <store> directives are not available" unless store_elements.size == 1
  
  store_element = store_elements.first
  
  type = store_element['type']
  unless type
    raise ConfigError, "Missing 'type' parameter on <store> directive"
  end
  log.debug "adding store type=#{type.dump}"
 
  @output = Plugin.new_output(type)
  @output.configure(store_element)
end
output_shutdown() click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 97
def output_shutdown
  @output.shutdown
end
output_start() click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 93
def output_start
  @output.start      
end
run() click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 127
def run
  loop do
    param = @q.pop

    tag = param.tag
    es = param.es
    chain = param.chain
 
    begin
      unless es.repeatable?
        m = MultiEventStream.new
        es.each {|time,record|
          m.add(time, record)
        }
        es = m
      end
      chain = OutputChain.new([@output], tag, es, chain)
      chain.next
    rescue
      $log.warn "raises exception: #{$!.class}, '#{$!.message}, #{param}'"
    end
  end
end