class Fluent::Plugin::SQSInput
Public Instance Methods
client()
click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 38 def client @client ||= Aws::SQS::Client.new(stub_responses: @stub_responses) end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 22 def configure(conf) super Aws.config = { access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, region: @region } end
queue()
click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 42 def queue @queue ||= Aws::SQS::Resource.new(client: client).queue(@sqs_url) end
run()
click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 50 def run queue.receive_messages( max_number_of_messages: @max_number_of_messages, wait_time_seconds: @wait_time_seconds, visibility_timeout: @visibility_timeout ).each do |message| record = parse_message(message) message.delete if @delete_message router.emit(@tag, Fluent::Engine.now, record) end rescue log.error 'failed to emit or receive', error: $ERROR_INFO.to_s, error_class: $ERROR_INFO.class.to_s log.warn_backtrace $ERROR_INFO.backtrace end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 46 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 32 def start super timer_execute(:in_sqs_run_periodic_timer, @receive_interval, &method(:run)) end
Private Instance Methods
parse_message(message)
click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 69 def parse_message(message) { 'body' => message.body.to_s, 'receipt_handle' => message.receipt_handle.to_s, 'message_id' => message.message_id.to_s, 'md5_of_body' => message.md5_of_body.to_s, 'queue_url' => message.queue_url.to_s, 'sender_id' => message.attributes['SenderId'].to_s } end