class Fluent::Plugin::SQSInput

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 38
def client
  @client ||= Aws::SQS::Client.new(stub_responses: @stub_responses)
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 22
def configure(conf)
  super

  Aws.config = {
    access_key_id: @aws_key_id,
    secret_access_key: @aws_sec_key,
    region: @region
  }
end
queue() click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 42
def queue
  @queue ||= Aws::SQS::Resource.new(client: client).queue(@sqs_url)
end
run() click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 50
def run
  queue.receive_messages(
    max_number_of_messages: @max_number_of_messages,
    wait_time_seconds: @wait_time_seconds,
    visibility_timeout: @visibility_timeout
  ).each do |message|
    record = parse_message(message)

    message.delete if @delete_message

    router.emit(@tag, Fluent::Engine.now, record)
  end
rescue
  log.error 'failed to emit or receive', error: $ERROR_INFO.to_s, error_class: $ERROR_INFO.class.to_s
  log.warn_backtrace $ERROR_INFO.backtrace
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 46
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 32
def start
  super

  timer_execute(:in_sqs_run_periodic_timer, @receive_interval, &method(:run))
end

Private Instance Methods

parse_message(message) click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 69
def parse_message(message)
  obj = {}
  message.attributes.each do |key, value|
    obj[key] = value.to_s
    puts "#{key}:#{value}"
  end
  obj['body'] = message.body.to_s,
  obj['receipt_handle'] = message.receipt_handle.to_s,
  obj['message_id'] = message.message_id.to_s,
  obj['md5_of_body'] = message.md5_of_body.to_s,
  obj['queue_url'] = message.queue_url.to_s,
  obj['sender_id'] = message.attributes['SenderId'].to_s
  return obj
end