class FFWD::Plugin::Kafka::Producer

A Kafka producer proxy for Poseidon (a kafka library) that delegates all blocking work to the EventMachine thread pool.

Public Class Methods

new(*args) click to toggle source
# File lib/ffwd/plugin/kafka/producer.rb, line 28
def initialize *args
  @args = args
  @mutex = Mutex.new
  @request = nil
  @stopped = false
end

Public Instance Methods

execute(&block) click to toggle source

Execute the provided block on a dedicated thread. The sole provided argument is an instance of Poseidon::Producer.

# File lib/ffwd/plugin/kafka/producer.rb, line 66
def execute &block
  raise "Expected block" unless block_given?
  raise "Request already pending" if @request

  if @stopped
    r = Request.new
    r.fail "producer stopped"
    return r
  end

  @request = Request.new

  EM.defer do
    begin
      result = block.call make_producer

      EM.next_tick do
        @request.succeed result
        @request = nil
        shutdown if @stopped
      end
    rescue => e
      EM.next_tick do
        @request.fail e
        @request = nil
        shutdown if @stopped
      end
    end
  end

  @request
end
make_producer() click to toggle source
# File lib/ffwd/plugin/kafka/producer.rb, line 54
def make_producer
  if EM.reactor_thread?
    raise "Should not be called in the reactor thread"
  end

  @mutex.synchronize do
    @producer ||= Poseidon::Producer.new(*@args)
  end
end
send_messages(messages) click to toggle source
# File lib/ffwd/plugin/kafka/producer.rb, line 48
def send_messages messages
  execute do |p|
    p.send_messages messages
  end
end
shutdown() click to toggle source
# File lib/ffwd/plugin/kafka/producer.rb, line 40
def shutdown
  return if @request

  @mutex.synchronize do
    @producer.shutdown
  end
end
stop() click to toggle source
# File lib/ffwd/plugin/kafka/producer.rb, line 35
def stop
  @stopped = true
  shutdown
end