add_plugin_dir(dir)
click to toggle source
def add_plugin_dir(dir)
Plugin.add_plugin_dir(dir)
end
emit(tag, time, record)
click to toggle source
def emit(tag, time, record)
raise "BUG: use router.emit instead of Engine.emit"
end
emit_array(tag, array)
click to toggle source
def emit_array(tag, array)
raise "BUG: use router.emit_array instead of Engine.emit_array"
end
emit_stream(tag, es)
click to toggle source
def emit_stream(tag, es)
raise "BUG: use router.emit_stream instead of Engine.emit_stream"
end
flush!()
click to toggle source
def flush!
@root_agent.flush!
end
init(system_config)
click to toggle source
def init(system_config)
@system_config = system_config
BasicSocket.do_not_reverse_lookup = true
suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
@suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
@root_agent = RootAgent.new(log: log, system_config: @system_config)
MessagePackFactory.init
self
end
log()
click to toggle source
log_event_loop()
click to toggle source
def log_event_loop
$log.disable_events(Thread.current)
while sleep(LOG_EMIT_INTERVAL)
break if @log_event_loop_stop
next if @log_event_queue.empty?
events = @log_event_queue.slice!(0..-1)
next if events.empty?
events.each {|tag,time,record|
begin
@event_router.emit(tag, time, record)
rescue => e
$log.error "failed to emit fluentd's log event", tag: tag, event: record, error: e
end
}
end
end
now()
click to toggle source
def now
Fluent::EventTime.now
end
parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
click to toggle source
def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
if fname =~ /\.rb$/
require 'fluent/config/dsl'
Config::DSL::Parser.parse(io, File.join(basepath, fname))
else
Config.parse(io, fname, basepath, v1_config)
end
end
push_log_event(tag, time, record)
click to toggle source
def push_log_event(tag, time, record)
return if @log_emit_thread.nil?
@log_event_queue.push([tag, time, record])
end
run()
click to toggle source
def run
begin
start
if @event_router.match?($log.tag)
$log.enable_event
@log_emit_thread = Thread.new(&method(:log_event_loop))
end
sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped
rescue Exception => e
$log.error "unexpected error", error: e
$log.error_backtrace
raise
end
$log.info "shutting down fluentd"
shutdown
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
end
end
stop()
click to toggle source
def stop
@engine_stopped = true
nil
end
suppress_interval(interval_time)
click to toggle source
def suppress_interval(interval_time)
@suppress_emit_error_log_interval = interval_time
@next_emit_error_log_time = Time.now.to_i
end