class FFWD::Plugin::Kafka::AttributePartitioner

Use a custom attribute for partitioning.

Constants

DEFAULT_ATTRIBUTE
OPTIONS

Public Class Methods

new(config) click to toggle source
# File lib/ffwd/plugin/kafka/partitioners.rb, line 49
def initialize config
  @attr = config[:attribute].to_sym
  @attr_s = config[:attribute].to_s
end
prepare(config) click to toggle source
# File lib/ffwd/plugin/kafka/partitioners.rb, line 44
def self.prepare config
  config[:attribute] ||= DEFAULT_ATTRIBUTE
  config
end

Public Instance Methods

partition(d) click to toggle source

currently there is an issue where you can store both symbols and string as attribute keys, we need to take that into account.

# File lib/ffwd/plugin/kafka/partitioners.rb, line 56
def partition d
  if v = d.attributes[@attr]
    return v
  end

  d.attributes[@attr_s]
end