module Fluent::Plugin::KinesisHelper::API::BatchRequest
Public Class Methods
included(mod)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 49 def self.included(mod) mod.include BatchRequestParams end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/kinesis_helper/api.rb, line 53 def configure(conf) super if @batch_request_max_count.nil? @batch_request_max_count = self.class::BatchRequestLimitCount elsif @batch_request_max_count > self.class::BatchRequestLimitCount raise ConfigError, "batch_request_max_count can't be grater than #{self.class::BatchRequestLimitCount}." end if @batch_request_max_size.nil? @batch_request_max_size = self.class::BatchRequestLimitSize elsif @batch_request_max_size > self.class::BatchRequestLimitSize raise ConfigError, "batch_request_max_size can't be grater than #{self.class::BatchRequestLimitSize}." end end
size_of_values(record)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 67 def size_of_values(record) record.compact.map(&:size).inject(:+) || 0 end
Private Instance Methods
any_records_shipped?(res)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 111 def any_records_shipped?(res) results(res).size > failed_count(res) end
batch_request_with_retry(batch, retry_count=0, backoff: nil) { |batch| ... }
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 89 def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block) backoff ||= Backoff.new res = yield(batch) if failed_count(res) > 0 failed_records = collect_failed_records(batch, res) if retry_count < @retries_on_batch_request backoff.reset if @reset_backoff_if_success and any_records_shipped?(res) wait_second = backoff.next msg = 'Retrying to request batch. Retry count: %3d, Retry records: %3d, Wait seconds %3.2f' % [retry_count+1, failed_records.size, wait_second] log.warn(truncate msg) # TODO: sleep() doesn't wait the given seconds sometime. # The root cause is unknown so far, so I'd like to add debug print only. It should be fixed in the future. log.debug("#{Thread.current.object_id} sleep start") sleep(wait_second) log.debug("#{Thread.current.object_id} sleep finish") batch_request_with_retry(retry_records(failed_records), retry_count+1, backoff: backoff, &block) else give_up_retries(failed_records) end end end
collect_failed_records(records, res)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 115 def collect_failed_records(records, res) failed_records = [] results(res).each_with_index do |record, index| next unless record[:error_code] original = case request_type when :streams, :firehose; records[index] when :streams_aggregated; records end failed_records.push( original: original, error_code: record[:error_code], error_message: record[:error_message] ) end failed_records end
failed_count(res)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 141 def failed_count(res) failed_field = case request_type when :streams; :failed_record_count when :streams_aggregated; :failed_record_count when :firehose; :failed_put_count end res[failed_field] end
give_up_retries(failed_records)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 159 def give_up_retries(failed_records) failed_records.each {|record| log.error(truncate 'Could not put record, Error: %s/%s, Record: %s' % [ record[:error_code], record[:error_message], record[:original] ]) } end
results(res)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 150 def results(res) result_field = case request_type when :streams; :records when :streams_aggregated; :records when :firehose; :request_responses end res[result_field] end
retry_records(failed_records)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 132 def retry_records(failed_records) case request_type when :streams, :firehose failed_records.map{|r| r[:original] } when :streams_aggregated failed_records.first[:original] end end
split_to_batches(records) { |batch, size| ... }
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 73 def split_to_batches(records, &block) batch = [] size = 0 records.each do |record| record_size = size_of_values(record) if batch.size+1 > @batch_request_max_count or size+record_size > @batch_request_max_size yield(batch, size) batch = [] size = 0 end batch << record size += record_size end yield(batch, size) if batch.size > 0 end