class Fluent::BufferedFilterOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_buffered_filter.rb, line 11
def configure(conf)
  super

  unless File.exist?(@filter_path)
    raise Fluent::ConfigError, "No such file: #{@filter_path}"
  end

  begin
    @filter = Object.new.instance_eval(File.read(@filter_path), @filter_path)
  rescue => e
    raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}"
  end

  unless @filter.respond_to?(:call)
    raise Fluent::ConfigError, "`call` method not implemented in filter: #{@filter_path}"
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_buffered_filter.rb, line 29
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_buffered_filter.rb, line 33
def write(chunk)
  chunk.msgpack_each do |tag, time, record|
    records = @filter.call(tag, time, record)
    records = [records] unless records.kind_of?(Array)
    emit_records(tag, time, records)
  end
end

Private Instance Methods

emit_records(tag, time, records) click to toggle source
# File lib/fluent/plugin/out_buffered_filter.rb, line 43
def emit_records(tag, time, records)
  records.each do |record|
    if record.kind_of?(Hash)
      Fluent::Engine.emit("#{@prefix}.#{tag}", time, record)
    else
      log.warn("Record must be Hash: #{record.inspect} (#{record.class})")
    end
  end
end