class Fluent::Plugin::SQSOutput

Constants

DEFAULT_BUFFER_TYPE

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 48
def client
  @client ||= Aws::SQS::Client.new
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sqs.rb, line 33
def configure(conf)
  compat_parameters_convert(conf, :buffer, :inject)
  super

  if (!@queue_name.nil? && @queue_name.end_with?('.fifo')) || (!@sqs_url.nil? && @sqs_url.end_with?('.fifo'))
    raise Fluent::ConfigError, 'message_group_id parameter is required for FIFO queue' if @message_group_id.nil?
  end

  Aws.config = {
    access_key_id: @aws_key_id,
    secret_access_key: @aws_sec_key,
    region: @region
  }
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 72
def format(tag, time, record)
  record[@tag_property_name] = tag if @include_tag
  record = inject_values_to_record(tag, time, record)

  record.to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 79
def formatted_to_msgpack_binary
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 83
def multi_workers_ready?
  true
end
queue() click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 56
def queue
  return @queue if @queue

  @queue = if @create_queue && @queue_name
             resource.create_queue(queue_name: @queue_name)
           else
             @queue = if @sqs_url
                        resource.queue(@sqs_url)
                      else
                        resource.get_queue_by_name(queue_name: @queue_name)
                      end
           end

  @queue
end
resource() click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 52
def resource
  @resource ||= Aws::SQS::Resource.new(client: client)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 87
def write(chunk)
  batch_records = []
  batch_size = 0
  send_batches = [batch_records]

  chunk.msgpack_each do |record|
    body = Yajl.dump(record)
    batch_size += body.bytesize

    if batch_size > SQS_BATCH_SEND_MAX_SIZE ||
       batch_records.length >= SQS_BATCH_SEND_MAX_MSGS
      batch_records = []
      batch_size = body.bytesize
      send_batches << batch_records
    end

    if batch_size > SQS_BATCH_SEND_MAX_SIZE
      log.warn 'Could not push message to SQS, payload exceeds ' \
               "#{SQS_BATCH_SEND_MAX_SIZE} bytes.  " \
               "(Truncated message: #{body[0..200]})"
    else
      id = "#{@tag_property_name}#{SecureRandom.hex(16)}"
      batch_record = { id: id, message_body: body, delay_seconds: @delay_seconds }
      batch_record[:message_group_id] = @message_group_id unless @message_group_id.nil?
      batch_records << batch_record
    end
  end

  until send_batches.length <= 0
    records = send_batches.shift
    until records.length <= 0
      queue.send_messages(entries: records.slice!(0..9))
    end
  end
end