class FFWD::Plugin::Kafka::Producer
A Kafka
producer proxy for Poseidon (a kafka library) that delegates all blocking work to the EventMachine thread pool.
Public Class Methods
new(*args)
click to toggle source
# File lib/ffwd/plugin/kafka/producer.rb, line 28 def initialize *args @args = args @mutex = Mutex.new @request = nil @stopped = false end
Public Instance Methods
execute(&block)
click to toggle source
Execute the provided block on a dedicated thread. The sole provided argument is an instance of Poseidon::Producer.
# File lib/ffwd/plugin/kafka/producer.rb, line 66 def execute &block raise "Expected block" unless block_given? raise "Request already pending" if @request if @stopped r = Request.new r.fail "producer stopped" return r end @request = Request.new EM.defer do begin result = block.call make_producer EM.next_tick do @request.succeed result @request = nil shutdown if @stopped end rescue => e EM.next_tick do @request.fail e @request = nil shutdown if @stopped end end end @request end
make_producer()
click to toggle source
# File lib/ffwd/plugin/kafka/producer.rb, line 54 def make_producer if EM.reactor_thread? raise "Should not be called in the reactor thread" end @mutex.synchronize do @producer ||= Poseidon::Producer.new(*@args) end end
send_messages(messages)
click to toggle source
# File lib/ffwd/plugin/kafka/producer.rb, line 48 def send_messages messages execute do |p| p.send_messages messages end end
shutdown()
click to toggle source
# File lib/ffwd/plugin/kafka/producer.rb, line 40 def shutdown return if @request @mutex.synchronize do @producer.shutdown end end
stop()
click to toggle source
# File lib/ffwd/plugin/kafka/producer.rb, line 35 def stop @stopped = true shutdown end