class Fluent::BufferedFilterOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_buffered_filter.rb, line 11 def configure(conf) super unless File.exist?(@filter_path) raise Fluent::ConfigError, "No such file: #{@filter_path}" end begin @filter = Object.new.instance_eval(File.read(@filter_path), @filter_path) rescue => e raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}" end unless @filter.respond_to?(:call) raise Fluent::ConfigError, "`call` method not implemented in filter: #{@filter_path}" end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_buffered_filter.rb, line 29 def format(tag, time, record) [tag, time, record].to_msgpack end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_buffered_filter.rb, line 33 def write(chunk) chunk.msgpack_each do |tag, time, record| records = @filter.call(tag, time, record) records = [records] unless records.kind_of?(Array) emit_records(tag, time, records) end end
Private Instance Methods
emit_records(tag, time, records)
click to toggle source
# File lib/fluent/plugin/out_buffered_filter.rb, line 43 def emit_records(tag, time, records) records.each do |record| if record.kind_of?(Hash) Fluent::Engine.emit("#{@prefix}.#{tag}", time, record) else log.warn("Record must be Hash: #{record.inspect} (#{record.class})") end end end