class FFWD::Plugin::Kafka::Output
Constants
- DEFAULT_BROKERS
- DEFAULT_PRODUCER
- MAPPING
Attributes
reporter_meta[R]
Public Class Methods
new(schema, router, partitioner, config)
click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 44 def initialize schema, router, partitioner, config @schema = schema @router = router @partitioner = partitioner @producer = config[:producer] @brokers = config[:brokers] @reporter_meta = {:producer_type => "kafka", :producer => @producer} @instance = nil end
prepare(config)
click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 38 def self.prepare config config[:producer] ||= DEFAULT_PRODUCER config[:brokers] ||= DEFAULT_BROKERS config end
Public Instance Methods
make_event_message(e)
click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 98 def make_event_message e topic = @router.route_event e return nil if topic.nil? data = @schema.dump_event e key = @partitioner.partition e MessageToSend.new topic, data, key end
make_metric_message(m)
click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 106 def make_metric_message m topic = @router.route_metric m return nil if topic.nil? data = @schema.dump_metric m key = @partitioner.partition m MessageToSend.new topic, data, key end
produce(events, metrics)
click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 70 def produce events, metrics unless @instance return nil end expected_messages = events.size + metrics.size messages = [] events.each do |e| message = make_event_message e next if message.nil? messages << message end metrics.each do |e| message = make_metric_message e next if message.nil? messages << message end if messages.size < expected_messages increment :routing_error, expected_messages - messages.size end increment :routing_success, messages.size @instance.send_messages messages end
setup()
click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 54 def setup if not @brokers or @brokers.empty? log.error "No usable initial list of brokers" return end @instance = Producer.new @brokers, @producer end
teardown()
click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 63 def teardown if @instance @instance.stop @instance = nil end end