class Fluent::Plugin::SQSPollInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs_poll.rb, line 15 def configure(conf) super end
poll()
click to toggle source
# File lib/fluent/plugin/in_sqs_poll.rb, line 33 def poll region = @sqs_url.split('.')[1] Aws.config.update(region: region) if @aws_access_key && @aws_secret_key Aws.config.update( credentials: Aws::Credentials.new(@aws_access_key, @aws_secret_key) ) end poller = Aws::SQS::QueuePoller.new(@sqs_url) poller.before_request do |stats| throw :stop_polling if @terminate end poller.poll(max_number_of_messages: @max_number_of_messages) do |messages| messages.each do |msg| begin router.emit(@tag, Time.now.to_i, { 'body' => msg.body, 'handle' => msg.receipt_handle, 'id' => msg.message_id, 'md5' => msg.md5_of_body, 'sqs_receive_count' => msg.attributes['ApproximateReceiveCount'], } ) rescue Exception => e $log.error("SQS exception", error: e.to_s, error_class: e.class.to_s) $log.warn_backtrace(e.backtrace) end end throw :stop_polling if @terminate end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs_poll.rb, line 26 def shutdown super @terminate = true @thread.join end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs_poll.rb, line 19 def start super @terminate = false @thread = Thread.new(&method(:poll)) end