class BPS::Publisher::Kafka
Constants
- CLIENT_OPTS
- PRODUCER_OPTS
Public Class Methods
coercer()
click to toggle source
@return [BPS::Coercer] the options coercer.
# File lib/bps/publisher/kafka.rb, line 75 def self.coercer @coercer ||= BPS::Coercer.new(CLIENT_OPTS.merge(PRODUCER_OPTS)).freeze end
new(broker_addrs, **opts)
click to toggle source
@param [Array<String>,URI] brokers the seed broker addresses. @param [Hash] opts the options. @see www.rubydoc.info/gems/ruby-kafka/Kafka/Client#initialize-instance_method
Calls superclass method
# File lib/bps/publisher/kafka.rb, line 82 def initialize(broker_addrs, **opts) super() broker_addrs = parse_url(broker_addrs) if broker_addrs.is_a?(URI) @topics = {} @client = ::Kafka.new(broker_addrs, **opts.slice(*CLIENT_OPTS.keys)) @producer = init_producer(**opts.slice(*PRODUCER_OPTS.keys)) end
Public Instance Methods
close()
click to toggle source
# File lib/bps/publisher/kafka.rb, line 95 def close @producer.shutdown @client.close end
topic(name)
click to toggle source
# File lib/bps/publisher/kafka.rb, line 91 def topic(name) @topics[name] ||= self.class::Topic.new(@producer, name) end
Private Instance Methods
init_producer(**opts)
click to toggle source
# File lib/bps/publisher/kafka.rb, line 110 def init_producer(**opts) @producer = @client.producer(**opts) end
parse_url(url)
click to toggle source
# File lib/bps/publisher/kafka.rb, line 102 def parse_url(url) port = url.port&.to_s || '9092' CGI.unescape(url.host).split(',').map do |addr| addr << ':' << port unless /:\d+$/.match?(addr) addr end end