class BPS::Publisher::Kafka

Constants

CLIENT_OPTS
PRODUCER_OPTS

Public Class Methods

coercer() click to toggle source

@return [BPS::Coercer] the options coercer.

# File lib/bps/publisher/kafka.rb, line 75
def self.coercer
  @coercer ||= BPS::Coercer.new(CLIENT_OPTS.merge(PRODUCER_OPTS)).freeze
end
new(broker_addrs, **opts) click to toggle source

@param [Array<String>,URI] brokers the seed broker addresses. @param [Hash] opts the options. @see www.rubydoc.info/gems/ruby-kafka/Kafka/Client#initialize-instance_method

Calls superclass method
# File lib/bps/publisher/kafka.rb, line 82
def initialize(broker_addrs, **opts)
  super()

  broker_addrs = parse_url(broker_addrs) if broker_addrs.is_a?(URI)
  @topics = {}
  @client = ::Kafka.new(broker_addrs, **opts.slice(*CLIENT_OPTS.keys))
  @producer = init_producer(**opts.slice(*PRODUCER_OPTS.keys))
end

Public Instance Methods

close() click to toggle source
# File lib/bps/publisher/kafka.rb, line 95
def close
  @producer.shutdown
  @client.close
end
topic(name) click to toggle source
# File lib/bps/publisher/kafka.rb, line 91
def topic(name)
  @topics[name] ||= self.class::Topic.new(@producer, name)
end

Private Instance Methods

init_producer(**opts) click to toggle source
# File lib/bps/publisher/kafka.rb, line 110
def init_producer(**opts)
  @producer = @client.producer(**opts)
end
parse_url(url) click to toggle source
# File lib/bps/publisher/kafka.rb, line 102
def parse_url(url)
  port = url.port&.to_s || '9092'
  CGI.unescape(url.host).split(',').map do |addr|
    addr << ':' << port unless /:\d+$/.match?(addr)
    addr
  end
end