class FFWD::Plugin::Kafka::Setup

Attributes

config[R]

Public Class Methods

new(config) click to toggle source
# File lib/ffwd/plugin/kafka.rb, line 64
def initialize config
  @config = Output.prepare Hash[config]
  @config = FFWD.prepare_schema @config
  @config[:partitioner] = FFWD::Plugin::Kafka.prepare_partitioner(
    @config[:partitioner] || {})
  @config[:router] = FFWD::Plugin::Kafka.prepare_router(
    @config[:router] || {})
end

Public Instance Methods

connect(core) click to toggle source
# File lib/ffwd/plugin/kafka.rb, line 73
def connect core
  partitioner = FFWD::Plugin::Kafka.build_partitioner @config[:partitioner]
  router = FFWD::Plugin::Kafka.build_router @config[:router]
  schema = FFWD.parse_schema @config
  output = Output.new schema, router, partitioner, @config
  FFWD.producing_client core.output, output, @config
end