class Fluent::Plugin::KinesisOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::KinesisHelper::API#configure
# File lib/fluent/plugin/kinesis.rb, line 72 def configure(conf) super @data_formatter = data_formatter_create(conf) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 77 def multi_workers_ready? true end
Private Instance Methods
compressor_create()
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 100 def compressor_create case @compression when "zlib" ->(data) { Zlib::Deflate.deflate(data) } else ->(data) { data } end end
data_formatter_create(conf)
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 83 def data_formatter_create(conf) formatter = formatter_create compressor = compressor_create if @data_key.nil? ->(tag, time, record) { record = inject_values_to_record(tag, time, record) compressor.call(formatter.format(tag, time, record).chomp.b) } else ->(tag, time, record) { raise InvalidRecordError, record unless record.is_a? Hash raise KeyNotFoundError.new(@data_key, record) if record[@data_key].nil? compressor.call(record[@data_key].to_s.b) } end end
format_for_api(&block)
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 109 def format_for_api(&block) converted = block.call size = size_of_values(converted) if size > @max_record_size raise ExceedMaxRecordSizeError.new(size, converted) end converted.to_msgpack rescue SkipRecordError => e log.error(truncate e) '' end
request_type()
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 133 def request_type self.class::RequestType end
truncate(msg)
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 137 def truncate(msg) if @log_truncate_max_size == 0 or (msg.to_s.size <= @log_truncate_max_size) msg.to_s else msg.to_s[0...@log_truncate_max_size] end end
write_records_batch(chunk, &block)
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 121 def write_records_batch(chunk, &block) unique_id = chunk.dump_unique_id_hex(chunk.unique_id) chunk.open do |io| records = msgpack_unpacker(io).to_enum split_to_batches(records) do |batch, size| log.debug(sprintf "Write chunk %s / %3d records / %4d KB", unique_id, batch.size, size/1024) batch_request_with_retry(batch, &block) log.debug("Finish writing chunk") end end end