class MQBench::Kafka

Public Class Methods

new(args) click to toggle source
Calls superclass method MQBench::Client::new
# File lib/mqbench/kafka.rb, line 5
def initialize(args)
  @port = 9092
  @host = 'localhost'
  
  super(args)
  
  @broker = ::Kafka.new(seed_brokers: ["#{@host}:#{@port}"])
end

Public Instance Methods

recv_msg() click to toggle source
# File lib/mqbench/kafka.rb, line 27
def recv_msg
  consumer = @broker.consumer(group_id: 'test')
  
  # It's possible to subscribe to multiple topics by calling `subscribe`
  # repeatedly.
  consumer.subscribe(QNAME)
  
  # This will loop indefinitely, yielding each message in turn.
  current = 1
  consumer.each_message do |message|
    current += 1
    if(current >= @count)
      break
    end
  end
end
send_msg() click to toggle source
# File lib/mqbench/kafka.rb, line 14
def send_msg
  producer = @broker.producer(:required_acks => 0,
                              :max_buffer_size => (@count * @size),
                              :max_buffer_bytesize => (@count * (@size + 100)))

  (1..@count).each do |x|
    producer.produce('a' * @size, topic: QNAME)
    producer.deliver_messages
  end

  producer.shutdown
end