# File lib/fluent/process.rb, line 242 def initialize(w, interval) @w = w @interval = interval @buffer = {} @mutex = Mutex.new Thread.new(&method(:run)) end
# File lib/fluent/process.rb, line 250 def emit(tag, es) stream = es.to_msgpack_stream @mutex.synchronize do if @buffer[tag] @buffer[tag] << stream else @buffer[tag] = stream end end end
# File lib/fluent/process.rb, line 261 def run while true sleep @interval pairs = [] @mutex.synchronize do @buffer.keys.each do |tag| if ms = @buffer.delete(tag) pairs << [tag, ms] end end end pairs.each do |pair| pair.to_msgpack(@w) end end rescue $log.error "error on forwerder thread", error: $!.to_s $log.error_backtrace raise end