class Fluent::KafkaInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka.rb, line 69 def initialize super require 'kafka' @time_parser = nil end
Public Instance Methods
add_offset_in_hash(hash, te, offset)
click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 183 def add_offset_in_hash(hash, te, offset) hash['kafka_topic'.freeze] = te.topic hash['kafka_partition'.freeze] = te.partition hash['kafka_offset'.freeze] = offset end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka.rb, line 76 def configure(conf) super @topic_list = [] if @topics @topic_list = @topics.split(',').map { |topic| TopicEntry.new(topic.strip, @partition, @offset) } else conf.elements.select { |element| element.name == 'topic' }.each do |element| unless element.has_key?('topic') raise Fluent::ConfigError, "kafka: 'topic' is a require parameter in 'topic element'." end partition = element.has_key?('partition') ? element['partition'].to_i : 0 offset = element.has_key?('offset') ? element['offset'].to_i : -1 @topic_list.push(TopicEntry.new(element['topic'], partition, offset)) end end if @topic_list.empty? raise Fluent::ConfigError, "kafka: 'topics' or 'topic element' is a require parameter" end # For backward compatibility @brokers = case when @host && @port ["#{@host}:#{@port}"] when @host ["#{@host}:9092"] when @port ["localhost:#{@port}"] else @brokers end if conf['max_wait_ms'] log.warn "'max_wait_ms' parameter is deprecated. Use second unit 'max_wait_time' instead" @max_wait_time = conf['max_wait_ms'].to_i / 1000 end @max_wait_time = @interval if @max_wait_time.nil? require 'zookeeper' if @offset_zookeeper @parser_proc = setup_parser(conf) @time_source = :record if @use_record_time if @time_source == :record and @time_format if defined?(Fluent::TimeParser) @time_parser = Fluent::TimeParser.new(@time_format) else @time_parser = Fluent::TextParser::TimeParser.new(@time_format) end end end
run()
click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 250 def run @loop.run rescue => e $log.error "unexpected error", :error => e.to_s $log.error_backtrace end
setup_parser(conf)
click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 133 def setup_parser(conf) case @format when 'json' begin require 'oj' Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS Proc.new { |msg, te| r = Oj.load(msg.value) add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record r } rescue LoadError require 'yajl' Proc.new { |msg, te| r = Yajl::Parser.parse(msg.value) add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record r } end when 'ltsv' require 'ltsv' Proc.new { |msg, te| r = LTSV.parse(msg.value, {:symbolize_keys => false}).first add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record r } when 'msgpack' require 'msgpack' Proc.new { |msg, te| r = MessagePack.unpack(msg.value) add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record r } when 'text' Proc.new { |msg, te| r = {@message_key => msg.value} add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record r } else @custom_parser = Fluent::Plugin.new_parser(conf['format']) @custom_parser.configure(conf) Proc.new { |msg| @custom_parser.parse(msg.value) {|_time, record| record } } end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka.rb, line 242 def shutdown @loop.stop @zookeeper.close! if @zookeeper @thread.join @kafka.close super end
start()
click to toggle source
Calls superclass method
Fluent::KafkaPluginUtil::SSLSettings#start
# File lib/fluent/plugin/in_kafka.rb, line 189 def start super @loop = Coolio::Loop.new opt = {} opt[:max_bytes] = @max_bytes if @max_bytes opt[:max_wait_time] = @max_wait_time if @max_wait_time opt[:min_bytes] = @min_bytes if @min_bytes logger = @get_kafka_client_log ? log : nil if @scram_mechanism != nil && @username != nil && @password != nil @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert, ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname) elsif @username != nil && @password != nil @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert, ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname) else @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert, ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, ssl_verify_hostname: @ssl_verify_hostname) end @zookeeper = Zookeeper.new(@offset_zookeeper) if @offset_zookeeper @topic_watchers = @topic_list.map {|topic_entry| offset_manager = OffsetManager.new(topic_entry, @zookeeper, @offset_zk_root_node) if @offset_zookeeper TopicWatcher.new( topic_entry, @kafka, interval, @parser_proc, @add_prefix, @add_suffix, offset_manager, router, @kafka_message_key, @time_source, @record_time_key, @tag_source, @record_tag_key, opt) } @topic_watchers.each {|tw| tw.attach(@loop) } @thread = Thread.new(&method(:run)) end