class Fluent::Plugin::SQSOutput
Constants
- DEFAULT_BUFFER_TYPE
Public Instance Methods
client()
click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 48 def client @client ||= Aws::SQS::Client.new end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sqs.rb, line 33 def configure(conf) compat_parameters_convert(conf, :buffer, :inject) super if (!@queue_name.nil? && @queue_name.end_with?('.fifo')) || (!@sqs_url.nil? && @sqs_url.end_with?('.fifo')) raise Fluent::ConfigError, 'message_group_id parameter is required for FIFO queue' if @message_group_id.nil? end Aws.config = { access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, region: @region } end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 72 def format(tag, time, record) record[@tag_property_name] = tag if @include_tag record = inject_values_to_record(tag, time, record) record.to_msgpack end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 79 def formatted_to_msgpack_binary true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 83 def multi_workers_ready? true end
queue()
click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 56 def queue return @queue if @queue @queue = if @create_queue && @queue_name resource.create_queue(queue_name: @queue_name) else @queue = if @sqs_url resource.queue(@sqs_url) else resource.get_queue_by_name(queue_name: @queue_name) end end @queue end
resource()
click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 52 def resource @resource ||= Aws::SQS::Resource.new(client: client) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_sqs.rb, line 87 def write(chunk) batch_records = [] batch_size = 0 send_batches = [batch_records] chunk.msgpack_each do |record| body = Yajl.dump(record) batch_size += body.bytesize if batch_size > SQS_BATCH_SEND_MAX_SIZE || batch_records.length >= SQS_BATCH_SEND_MAX_MSGS batch_records = [] batch_size = body.bytesize send_batches << batch_records end if batch_size > SQS_BATCH_SEND_MAX_SIZE log.warn 'Could not push message to SQS, payload exceeds ' \ "#{SQS_BATCH_SEND_MAX_SIZE} bytes. " \ "(Truncated message: #{body[0..200]})" else id = "#{@tag_property_name}#{SecureRandom.hex(16)}" batch_record = { id: id, message_body: body, delay_seconds: @delay_seconds } batch_record[:message_group_id] = @message_group_id unless @message_group_id.nil? batch_records << batch_record end end until send_batches.length <= 0 records = send_batches.shift until records.length <= 0 queue.send_messages(entries: records.slice!(0..9)) end end end