class Fluent::Plugin::KinesisStreamsAggregatedOutput
Constants
- BatchRequestLimitCount
- BatchRequestLimitSize
- RequestType
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::KinesisHelper::API::BatchRequest#configure
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 32 def configure(conf) super @partition_key_generator = create_partition_key_generator @batch_request_max_size -= offset @max_record_size -= offset end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 39 def format(tag, time, record) format_for_api do [@data_formatter.call(tag, time, record)] end end
offset()
click to toggle source
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 59 def offset @offset ||= AggregateOffset + @partition_key_generator.call.size*2 end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 45 def write(chunk) write_records_batch(chunk) do |batch| key = @partition_key_generator.call records = batch.map{|(data)|data} client.put_records( stream_name: @stream_name, records: [{ partition_key: key, data: aggregator.aggregate(records, key), }], ) end end
Private Instance Methods
create_partition_key_generator()
click to toggle source
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 69 def create_partition_key_generator if @fixed_partition_key.nil? ->() { SecureRandom.hex(16) } else ->() { @fixed_partition_key } end end
size_of_values(record)
click to toggle source
Calls superclass method
Fluent::Plugin::KinesisHelper::API::BatchRequest#size_of_values
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 65 def size_of_values(record) super(record) + RecordOffset end