class Fluent::Plugin::KinesisOutput

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 72
def configure(conf)
  super
  @data_formatter = data_formatter_create(conf)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 77
def multi_workers_ready?
  true
end

Private Instance Methods

compressor_create() click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 100
def compressor_create
  case @compression
  when "zlib"
    ->(data) { Zlib::Deflate.deflate(data) }
  else
    ->(data) { data }
  end
end
data_formatter_create(conf) click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 83
def data_formatter_create(conf)
  formatter = formatter_create
  compressor = compressor_create
  if @data_key.nil?
    ->(tag, time, record) {
      record = inject_values_to_record(tag, time, record)
      compressor.call(formatter.format(tag, time, record).chomp.b)
    }
  else
    ->(tag, time, record) {
      raise InvalidRecordError, record unless record.is_a? Hash
      raise KeyNotFoundError.new(@data_key, record) if record[@data_key].nil?
      compressor.call(record[@data_key].to_s.b)
    }
  end
end
format_for_api(&block) click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 109
def format_for_api(&block)
  converted = block.call
  size = size_of_values(converted)
  if size > @max_record_size
    raise ExceedMaxRecordSizeError.new(size, converted)
  end
  converted.to_msgpack
rescue SkipRecordError => e
  log.error(truncate e)
  ''
end
request_type() click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 133
def request_type
  self.class::RequestType
end
truncate(msg) click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 137
def truncate(msg)
  if @log_truncate_max_size == 0 or (msg.to_s.size <= @log_truncate_max_size)
    msg.to_s
  else
    msg.to_s[0...@log_truncate_max_size]
  end
end
write_records_batch(chunk, &block) click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 121
def write_records_batch(chunk, &block)
  unique_id = chunk.dump_unique_id_hex(chunk.unique_id)
  chunk.open do |io|
    records = msgpack_unpacker(io).to_enum
    split_to_batches(records) do |batch, size|
      log.debug(sprintf "Write chunk %s / %3d records / %4d KB", unique_id, batch.size, size/1024)
      batch_request_with_retry(batch, &block)
      log.debug("Finish writing chunk")
    end
  end
end