module Fluent::Plugin::KinesisHelper::API::BatchRequest

Public Class Methods

included(mod) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 49
def self.included(mod)
  mod.include BatchRequestParams
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/kinesis_helper/api.rb, line 53
def configure(conf)
  super
  if @batch_request_max_count.nil?
    @batch_request_max_count = self.class::BatchRequestLimitCount
  elsif @batch_request_max_count > self.class::BatchRequestLimitCount
    raise ConfigError, "batch_request_max_count can't be grater than #{self.class::BatchRequestLimitCount}."
  end
  if @batch_request_max_size.nil?
    @batch_request_max_size = self.class::BatchRequestLimitSize
  elsif @batch_request_max_size > self.class::BatchRequestLimitSize
    raise ConfigError, "batch_request_max_size can't be grater than #{self.class::BatchRequestLimitSize}."
  end
end
size_of_values(record) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 67
def size_of_values(record)
  record.compact.map(&:size).inject(:+) || 0
end

Private Instance Methods

any_records_shipped?(res) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 111
def any_records_shipped?(res)
  results(res).size > failed_count(res)
end
batch_request_with_retry(batch, retry_count=0, backoff: nil) { |batch| ... } click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 89
def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block)
  backoff ||= Backoff.new
  res = yield(batch)
  if failed_count(res) > 0
    failed_records = collect_failed_records(batch, res)
    if retry_count < @retries_on_batch_request
      backoff.reset if @reset_backoff_if_success and any_records_shipped?(res)
      wait_second = backoff.next
      msg = 'Retrying to request batch. Retry count: %3d, Retry records: %3d, Wait seconds %3.2f' % [retry_count+1, failed_records.size, wait_second]
      log.warn(truncate msg)
      # TODO: sleep() doesn't wait the given seconds sometime.
      # The root cause is unknown so far, so I'd like to add debug print only. It should be fixed in the future.
      log.debug("#{Thread.current.object_id} sleep start")
      sleep(wait_second)
      log.debug("#{Thread.current.object_id} sleep finish")
      batch_request_with_retry(retry_records(failed_records), retry_count+1, backoff: backoff, &block)
    else
      give_up_retries(failed_records)
    end
  end
end
collect_failed_records(records, res) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 115
def collect_failed_records(records, res)
  failed_records = []
  results(res).each_with_index do |record, index|
    next unless record[:error_code]
    original = case request_type
               when :streams, :firehose; records[index]
               when :streams_aggregated; records
               end
    failed_records.push(
      original:      original,
      error_code:    record[:error_code],
      error_message: record[:error_message]
    )
  end
  failed_records
end
failed_count(res) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 141
def failed_count(res)
  failed_field = case request_type
                 when :streams;            :failed_record_count
                 when :streams_aggregated; :failed_record_count
                 when :firehose;           :failed_put_count
                 end
  res[failed_field]
end
give_up_retries(failed_records) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 159
def give_up_retries(failed_records)
  failed_records.each {|record|
    log.error(truncate 'Could not put record, Error: %s/%s, Record: %s' % [
      record[:error_code],
      record[:error_message],
      record[:original]
    ])
  }
end
results(res) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 150
def results(res)
  result_field = case request_type
                 when :streams;            :records
                 when :streams_aggregated; :records
                 when :firehose;           :request_responses
                 end
  res[result_field]
end
retry_records(failed_records) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 132
def retry_records(failed_records)
  case request_type
  when :streams, :firehose
    failed_records.map{|r| r[:original] }
  when :streams_aggregated
    failed_records.first[:original]
  end
end
split_to_batches(records) { |batch, size| ... } click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 73
  def split_to_batches(records, &block)
    batch = []
    size = 0
    records.each do |record|
      record_size = size_of_values(record)
      if batch.size+1 > @batch_request_max_count or size+record_size > @batch_request_max_size
        yield(batch, size)
        batch = []
        size = 0
end
      batch << record
      size += record_size
    end
    yield(batch, size) if batch.size > 0
  end