class FluentPluginKinesisAggregation::OutputFilter
Constants
- DEFAULT_BUFFER_TYPE
- FLUENTD_MAX_BUFFER_SIZE
200 is an arbitrary number more than the envelope overhead and big enough to store partition/hash key table in AggregatedRecords. Note that you shouldn't really ever have the buffer this high, since you're likely to fail the write if anyone else is writing to the shard at the time.
- KPL_MAGIC_NUMBER
github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
- NAME
- PUT_RECORD_MAX_DATA_SIZE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 88 def configure(conf) compat_parameters_convert(conf, :buffer, :inject) super if @buffer.chunk_limit_size > FLUENTD_MAX_BUFFER_SIZE raise Fluent::ConfigError, "Kinesis buffer_chunk_limit is set to more than the 1mb shard limit (i.e. you won't be able to write your chunks!" end if @buffer.chunk_limit_size > FLUENTD_MAX_BUFFER_SIZE / 3 log.warn 'Kinesis buffer_chunk_limit is set at more than 1/3 of the per second shard limit (1mb). This is not good if you have many producers.' end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 106 def format(tag, time, record) record = inject_values_to_record(tag, time, record) return AggregatedRecord.encode(AggregatedRecord.new( records: [Record.new( partition_key_index: 1, data: Yajl.dump(record).b )] )) end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 101 def start super load_client end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 117 def write(chunk) records = chunk.read if records.length > FLUENTD_MAX_BUFFER_SIZE log.error "Can't emit aggregated #{@stream_name} stream record of length #{records.length} (more than #{FLUENTD_MAX_BUFFER_SIZE})" return # do not throw, since we can't retry end partition_key = @fixed_partition_key || SecureRandom.uuid # confusing magic. Because of the format of protobuf records, # it's valid (in this case) to concatenate the AggregatedRecords # to form one AggregatedRecord, since we only have a repeated field # in records. # # ALSO, since we use google's protobuf stuff (much better # memory usage due to C extension), we're stuck on proto3. # Unfortunately, KPL uses proto2 form, and partition_key_index # is a required field. If we set it to 0 in proto3, though, # it's helpfully ignored in the serialisation (default!). # Therefore we have to pass a partition_key_index of 1, # and put two things in our partition_key_table. message = AggregatedRecord.encode(AggregatedRecord.new( partition_key_table: ['a', partition_key] )) + records @client.put_record( stream_name: @stream_name, data: kpl_aggregation_pack(message), partition_key: partition_key ) end
Private Instance Methods
kpl_aggregation_pack(message)
click to toggle source
# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 153 def kpl_aggregation_pack(message) [ KPL_MAGIC_NUMBER, message, Digest::MD5.digest(message) ].pack("A4A*A16") end
load_client()
click to toggle source
This code is unchanged from github.com/awslabs/aws-fluent-plugin-kinesis
# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 160 def load_client user_agent_suffix = "fluent-#{NAME}" options = { user_agent_suffix: user_agent_suffix } if @region options[:region] = @region end if @aws_key_id && @aws_sec_key options.update( access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, ) elsif @profile credentials_opts = {:profile_name => @profile} credentials_opts[:path] = @credentials_path if @credentials_path credentials = Aws::SharedCredentials.new(credentials_opts) options[:credentials] = credentials elsif @role_arn credentials = Aws::AssumeRoleCredentials.new( client: Aws::STS::Client.new(options), role_arn: @role_arn, role_session_name: "fluent-plugin-kinesis-aggregation", external_id: @external_id, duration_seconds: 60 * 60 ) options[:credentials] = credentials end if @debug options.update( logger: Logger.new(log.out), log_level: :debug ) end if @http_proxy options[:http_proxy] = @http_proxy end @client = Aws::Kinesis::Client.new(options) end