class Fluent::Plugin::JSONStreamInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_jsonstream.rb, line 23
def configure(conf)
  super
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_jsonstream.rb, line 27
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_jsonstream.rb, line 31
def start
  super

  server_create_connection(:in_jsonstream_server, @port, bind: @bind) do |conn|
    parser = newParser()
    conn.data do |data|
      begin
        parser << data
      rescue Yajl::ParseError
        parser = newParser()
      end
    end
  end
end

Private Instance Methods

newParser() click to toggle source
# File lib/fluent/plugin/in_jsonstream.rb, line 48
def newParser
  parser = Yajl::Parser.new(:symbolize_keys => false)
  parser.on_parse_complete = lambda { |record|
    tag = extract_tag_from_record(record)
    tag ||= @tag
    time ||= extract_time_from_record(record) || Fluent::EventTime.now

    # Use the recorded event time if available
    time = record.delete('timestamp').to_i if record.key?('timestamp')

    router.emit(tag, time, record)
  }
  parser
end