class Fluent::KafkaInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka.rb, line 69
def initialize
  super
  require 'kafka'

  @time_parser = nil
end

Public Instance Methods

add_offset_in_hash(hash, te, offset) click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 183
def add_offset_in_hash(hash, te, offset)
  hash['kafka_topic'.freeze] = te.topic
  hash['kafka_partition'.freeze] = te.partition
  hash['kafka_offset'.freeze] = offset
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka.rb, line 76
def configure(conf)
  super

  @topic_list = []
  if @topics
    @topic_list = @topics.split(',').map { |topic|
      TopicEntry.new(topic.strip, @partition, @offset)
    }
  else
    conf.elements.select { |element| element.name == 'topic' }.each do |element|
      unless element.has_key?('topic')
        raise Fluent::ConfigError, "kafka: 'topic' is a require parameter in 'topic element'."
      end
      partition = element.has_key?('partition') ? element['partition'].to_i : 0
      offset = element.has_key?('offset') ? element['offset'].to_i : -1
      @topic_list.push(TopicEntry.new(element['topic'], partition, offset))
    end
  end

  if @topic_list.empty?
    raise Fluent::ConfigError, "kafka: 'topics' or 'topic element' is a require parameter"
  end

  # For backward compatibility
  @brokers = case
             when @host && @port
               ["#{@host}:#{@port}"]
             when @host
               ["#{@host}:9092"]
             when @port
               ["localhost:#{@port}"]
             else
               @brokers
             end

  if conf['max_wait_ms']
    log.warn "'max_wait_ms' parameter is deprecated. Use second unit 'max_wait_time' instead"
    @max_wait_time = conf['max_wait_ms'].to_i / 1000
  end

  @max_wait_time = @interval if @max_wait_time.nil?

  require 'zookeeper' if @offset_zookeeper

  @parser_proc = setup_parser(conf)

  @time_source = :record if @use_record_time

  if @time_source == :record and @time_format
    if defined?(Fluent::TimeParser)
      @time_parser = Fluent::TimeParser.new(@time_format)
    else
      @time_parser = Fluent::TextParser::TimeParser.new(@time_format)
    end
  end
end
run() click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 250
def run
  @loop.run
rescue => e
  $log.error "unexpected error", :error => e.to_s
  $log.error_backtrace
end
setup_parser(conf) click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 133
def setup_parser(conf)
  case @format
  when 'json'
    begin
      require 'oj'
      Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
      Proc.new { |msg, te|
        r = Oj.load(msg.value)
        add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record
        r
      }
    rescue LoadError
      require 'yajl'
      Proc.new { |msg, te|
        r = Yajl::Parser.parse(msg.value)
        add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record
        r
      }
    end
  when 'ltsv'
    require 'ltsv'
    Proc.new { |msg, te|
      r = LTSV.parse(msg.value, {:symbolize_keys => false}).first
      add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record
      r
    }
  when 'msgpack'
    require 'msgpack'
    Proc.new { |msg, te|
      r = MessagePack.unpack(msg.value)
      add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record
      r
    }
  when 'text'
    Proc.new { |msg, te|
      r = {@message_key => msg.value}
      add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record
      r
    }
  else
    @custom_parser = Fluent::Plugin.new_parser(conf['format'])
    @custom_parser.configure(conf)
    Proc.new { |msg|
      @custom_parser.parse(msg.value) {|_time, record|
        record
      }
    }
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka.rb, line 242
def shutdown
  @loop.stop
  @zookeeper.close! if @zookeeper
  @thread.join
  @kafka.close
  super
end
start() click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 189
def start
  super

  @loop = Coolio::Loop.new
  opt = {}
  opt[:max_bytes] = @max_bytes if @max_bytes
  opt[:max_wait_time] = @max_wait_time if @max_wait_time
  opt[:min_bytes] = @min_bytes if @min_bytes

  logger = @get_kafka_client_log ? log : nil
  if @scram_mechanism != nil && @username != nil && @password != nil
    @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
                       ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                       ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
                       sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
  elsif @username != nil && @password != nil
    @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
                       ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                       ssl_ca_certs_from_system: @ssl_ca_certs_from_system,sasl_plain_username: @username, sasl_plain_password: @password,
                       sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
  else
    @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
                       ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                       ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
                       ssl_verify_hostname: @ssl_verify_hostname)
  end

  @zookeeper = Zookeeper.new(@offset_zookeeper) if @offset_zookeeper

  @topic_watchers = @topic_list.map {|topic_entry|
    offset_manager = OffsetManager.new(topic_entry, @zookeeper, @offset_zk_root_node) if @offset_zookeeper
    TopicWatcher.new(
      topic_entry,
      @kafka,
      interval,
      @parser_proc,
      @add_prefix,
      @add_suffix,
      offset_manager,
      router,
      @kafka_message_key,
      @time_source,
      @record_time_key,
      @tag_source,
      @record_tag_key,
      opt)
  }
  @topic_watchers.each {|tw|
    tw.attach(@loop)
  }
  @thread = Thread.new(&method(:run))
end