class FFWD::Plugin::Kafka::Output

Constants

DEFAULT_BROKERS
DEFAULT_PRODUCER
MAPPING

Attributes

reporter_meta[R]

Public Class Methods

new(schema, router, partitioner, config) click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 44
def initialize schema, router, partitioner, config
  @schema = schema
  @router = router
  @partitioner = partitioner
  @producer = config[:producer]
  @brokers = config[:brokers]
  @reporter_meta = {:producer_type => "kafka", :producer => @producer}
  @instance = nil
end
prepare(config) click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 38
def self.prepare config
  config[:producer] ||= DEFAULT_PRODUCER
  config[:brokers] ||= DEFAULT_BROKERS
  config
end

Public Instance Methods

make_event_message(e) click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 98
def make_event_message e
  topic = @router.route_event e
  return nil if topic.nil?
  data = @schema.dump_event e
  key = @partitioner.partition e
  MessageToSend.new topic, data, key
end
make_metric_message(m) click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 106
def make_metric_message m
  topic = @router.route_metric m
  return nil if topic.nil?
  data = @schema.dump_metric m
  key = @partitioner.partition m
  MessageToSend.new topic, data, key
end
produce(events, metrics) click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 70
def produce events, metrics
  unless @instance
    return nil
  end

  expected_messages = events.size + metrics.size
  messages = []

  events.each do |e|
    message = make_event_message e
    next if message.nil?
    messages << message
  end

  metrics.each do |e|
    message = make_metric_message e
    next if message.nil?
    messages << message
  end

  if messages.size < expected_messages
    increment :routing_error, expected_messages - messages.size
  end

  increment :routing_success, messages.size
  @instance.send_messages messages
end
setup() click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 54
def setup
  if not @brokers or @brokers.empty?
    log.error "No usable initial list of brokers"
    return
  end

  @instance = Producer.new @brokers, @producer
end
teardown() click to toggle source
# File lib/ffwd/plugin/kafka/output.rb, line 63
def teardown
  if @instance
    @instance.stop
    @instance = nil
  end
end