class Fluent::Plugin::KinesisStreamsOutput
Constants
- BatchRequestLimitCount
- BatchRequestLimitSize
- RequestType
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis_streams.rb, line 45 def initialize super @default_host = Socket.gethostname end
placeholder_expander(log)
click to toggle source
Thanks to github.com/kazegusuri/fluent-plugin-prometheus/blob/348c112d/lib/fluent/plugin/prometheus.rb
# File lib/fluent/plugin/out_kinesis_streams.rb, line 52 def self.placeholder_expander(log) # Use internal class in order to expand placeholder if defined?(Fluent::Filter) # for v0.12, built-in PlaceholderExpander begin require 'fluent/plugin/filter_record_transformer' if defined?(Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander) # for v0.14 return Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander.new(log: log) else # for v0.12 return Fluent::RecordTransformerFilter::PlaceholderExpander.new(log: log) end rescue LoadError => e raise ConfigError, "cannot find filter_record_transformer plugin: #{e.message}" end else # for v0.10, use PlaceholderExapander in fluent-plugin-record-reformer plugin begin require 'fluent/plugin/out_record_reformer.rb' return Fluent::RecordReformerOutput::PlaceholderExpander.new(log: log) rescue LoadError => e raise ConfigError, "cannot find fluent-plugin-record-reformer: #{e.message}" end end end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::KinesisHelper::API::BatchRequest#configure
# File lib/fluent/plugin/out_kinesis_streams.rb, line 77 def configure(conf) super # check_conflict # prepare_key_fields @placeholder_expander = Fluent::Plugin::KinesisStreamsOutput.placeholder_expander(log) @hostname = Socket.gethostname @key_formatter = key_formatter_create end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_kinesis_streams.rb, line 87 def format(tag, time, record) placeholder_values = { 'tag' => tag, 'tag_parts' => tag.split('.'), 'hostname' => @default_host, 'record' => record } placeholders = @placeholder_expander.prepare_placeholders(placeholder_values) payload = { # for v0.14 millisecs time precision time: time.is_a?(Integer) ? time.to_i : time.to_f, source: @source.nil? ? tag.to_s : @placeholder_expander.expand(@source, placeholders), sourcetype: @placeholder_expander.expand(@sourcetype.to_s, placeholders), host: @host.nil? ? @default_host : @placeholder_expander.expand(@host.to_s, placeholders), index: @placeholder_expander.expand(@index.to_s, placeholders) } #payload[:index] = @index if @index #payload[:source] = @source if @source #payload[:sourcetype] = @sourcetype if @sourcetype # delete nil fields otherwise will get formet error from HEC %i[host index source sourcetype].each { |f| payload.delete f if payload[f].nil? } if !record[@data_key].nil? payload[@data_key] = record[@data_key] end if @all_items payload[:event] = record else payload[:event] = record["message"] end format_for_api do data = @data_formatter.call(tag, time, payload) key = @key_formatter.call(payload) [data, key] end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_kinesis_streams.rb, line 130 def write(chunk) write_records_batch(chunk) do |batch| records = batch.map{|(data, partition_key)| { data: data, partition_key: partition_key } } client.put_records( stream_name: @stream_name, records: records, ) end end
Private Instance Methods
key_formatter_create()
click to toggle source
# File lib/fluent/plugin/out_kinesis_streams.rb, line 144 def key_formatter_create if @partition_key.nil? ->(record) { SecureRandom.hex(16) } else ->(record) { if !record.key?(@partition_key) raise KeyNotFoundError.new(@partition_key, record) end record[@partition_key] } end end