class Fluent::Plugin::KinesisStreamsOutput

Constants

BatchRequestLimitCount
BatchRequestLimitSize
RequestType

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis_streams.rb, line 45
def initialize
  super
  @default_host = Socket.gethostname
end
placeholder_expander(log) click to toggle source

Thanks to github.com/kazegusuri/fluent-plugin-prometheus/blob/348c112d/lib/fluent/plugin/prometheus.rb

# File lib/fluent/plugin/out_kinesis_streams.rb, line 52
def self.placeholder_expander(log)
  # Use internal class in order to expand placeholder
  if defined?(Fluent::Filter) # for v0.12, built-in PlaceholderExpander
    begin
      require 'fluent/plugin/filter_record_transformer'
      if defined?(Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander)
        # for v0.14
        return Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander.new(log: log)
      else
        # for v0.12
        return Fluent::RecordTransformerFilter::PlaceholderExpander.new(log: log)
      end
    rescue LoadError => e
      raise ConfigError, "cannot find filter_record_transformer plugin: #{e.message}"
    end
  else # for v0.10, use PlaceholderExapander in fluent-plugin-record-reformer plugin
    begin
      require 'fluent/plugin/out_record_reformer.rb'
      return Fluent::RecordReformerOutput::PlaceholderExpander.new(log: log)
    rescue LoadError => e
      raise ConfigError, "cannot find fluent-plugin-record-reformer: #{e.message}"
    end
  end
end

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/out_kinesis_streams.rb, line 77
def configure(conf)
  super
 # check_conflict
#  prepare_key_fields
  @placeholder_expander = Fluent::Plugin::KinesisStreamsOutput.placeholder_expander(log)
  @hostname = Socket.gethostname
  @key_formatter = key_formatter_create
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_kinesis_streams.rb, line 87
def format(tag, time, record)

  placeholder_values = {
      'tag' => tag,
      'tag_parts' => tag.split('.'),
      'hostname' => @default_host,
      'record' => record
  }

  placeholders = @placeholder_expander.prepare_placeholders(placeholder_values)

  payload = {
      # for v0.14 millisecs time precision
      time: time.is_a?(Integer) ? time.to_i : time.to_f,
      source: @source.nil? ? tag.to_s : @placeholder_expander.expand(@source, placeholders),
      sourcetype: @placeholder_expander.expand(@sourcetype.to_s, placeholders),
      host: @host.nil? ? @default_host : @placeholder_expander.expand(@host.to_s, placeholders),
      index: @placeholder_expander.expand(@index.to_s, placeholders)
  }
  #payload[:index] = @index if @index
  #payload[:source] = @source if @source
  #payload[:sourcetype] = @sourcetype if @sourcetype

  # delete nil fields otherwise will get formet error from HEC
  %i[host index source sourcetype].each { |f| payload.delete f if payload[f].nil? }

  if !record[@data_key].nil?
     payload[@data_key] = record[@data_key]
  end

  if @all_items
     payload[:event] =  record
  else
    payload[:event] = record["message"]
  end

  format_for_api do
  data = @data_formatter.call(tag, time, payload)
  key = @key_formatter.call(payload)
  [data, key]
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_kinesis_streams.rb, line 130
def write(chunk)
  write_records_batch(chunk) do |batch|
    records = batch.map{|(data, partition_key)|
      { data: data, partition_key: partition_key }
    }
    client.put_records(
      stream_name: @stream_name,
      records: records,
    )
  end
end

Private Instance Methods

key_formatter_create() click to toggle source
# File lib/fluent/plugin/out_kinesis_streams.rb, line 144
def key_formatter_create
  if @partition_key.nil?
    ->(record) { SecureRandom.hex(16) }
  else
    ->(record) {
      if !record.key?(@partition_key)
        raise KeyNotFoundError.new(@partition_key, record)
      end
      record[@partition_key]
    }
  end
end