class Fluent::Plugin::PulsarInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_pulsar.rb, line 47
def configure(conf)
  super

end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_pulsar.rb, line 51
def start
  super
  client = Message::PulsarClient.new()
  client.connect(@pulsar_host, @pulsar_port)
  client.subscribe(@pulsar_topic, @pulsar_subscription, @pulsar_subtype)
  while true do
    m = client.get_message()
    if m != nil
      time = Fluent::Engine.now
      record = {"message_entry_id"=>m.message_entry_id, "message_ledger_id"=>m.message_ledger_id,  "client_created_id"=>m.client_created_id, "message"=>m.message}
      client.ack(m.client_created_id, m.message_ledger_id, m.message_entry_id)
      router.emit(@tag, time, record)
    else
      sleep @pull_duration.to_f
    end
  end
end