class Fluent::Plugin::PulsarInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_pulsar.rb, line 47 def configure(conf) super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_pulsar.rb, line 51 def start super client = Message::PulsarClient.new() client.connect(@pulsar_host, @pulsar_port) client.subscribe(@pulsar_topic, @pulsar_subscription, @pulsar_subtype) while true do m = client.get_message() if m != nil time = Fluent::Engine.now record = {"message_entry_id"=>m.message_entry_id, "message_ledger_id"=>m.message_ledger_id, "client_created_id"=>m.client_created_id, "message"=>m.message} client.ack(m.client_created_id, m.message_ledger_id, m.message_entry_id) router.emit(@tag, time, record) else sleep @pull_duration.to_f end end end