class Fluent::ConfigReloaderOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_config_reloader.rb, line 40 def initialize super @q = Queue.new end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_config_reloader.rb, line 50 def configure(conf) super load_config_file end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 74 def emit(tag, es, chain) param = OpenStruct.new param.tag = tag param.es = es param.chain = chain @q.push param end
outputs()
click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 46 def outputs [@output] end
shutdown()
click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 66 def shutdown @watcher.delete_observers Thread.kill(@thread) output_shutdown rescue $log.warn "raises exception: #{$!.class}, '#{$!.message}" end
start()
click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 56 def start output_start @thread = Thread.new(&method(:run)) @watcher = ReloadFileWatcher.create(self, @reload_file, @reload_file_watch_interval) rescue $log.warn "raises exception: #{$!.class}, '#{$!.message}" end
update()
click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 83 def update $log.warn 'config_reloader: reload config file start' output_shutdown load_config_file output_start $log.warn 'config_reloader: reload config file end' end
Private Instance Methods
load_config_file()
click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 101 def load_config_file path = File.expand_path(@config_file) store_elements = File.open(path) do |io| if File.extname(path) == '.rb' require 'fluent/config/dsl' Config::DSL::Parser.parse(io, path) else Config.parse(io, File.basename(path), File.dirname(path), false) end end.elements.select {|e| e.name == 'store'} raise ConfigError, "Multiple <store> directives are not available" unless store_elements.size == 1 store_element = store_elements.first type = store_element['type'] unless type raise ConfigError, "Missing 'type' parameter on <store> directive" end log.debug "adding store type=#{type.dump}" @output = Plugin.new_output(type) @output.configure(store_element) end
output_shutdown()
click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 97 def output_shutdown @output.shutdown end
output_start()
click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 93 def output_start @output.start end
run()
click to toggle source
# File lib/fluent/plugin/out_config_reloader.rb, line 127 def run loop do param = @q.pop tag = param.tag es = param.es chain = param.chain begin unless es.repeatable? m = MultiEventStream.new es.each {|time,record| m.add(time, record) } es = m end chain = OutputChain.new([@output], tag, es, chain) chain.next rescue $log.warn "raises exception: #{$!.class}, '#{$!.message}, #{param}'" end end end