class FluentPluginFirehose::FirehoseBufferedOutput

Constants

PUT_RECORD_BATCH_MAX_COUNT
PUT_RECORD_BATCH_MAX_DATA_SIZE
PUT_RECORD_MAX_DATA_SIZE
USER_AGENT_NAME

Public Instance Methods

build_data_to_put(data) click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 178
def build_data_to_put(data)
  if @zlib_compression
    Hash[data.map{|k, v| [k.to_sym, k=="data" ? Zlib::Deflate.deflate(v) : v] }]
  else
    Hash[data.map{|k, v| [k.to_sym, v] }]
  end
end
build_records_array_to_put(data_list) click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 202
def build_records_array_to_put(data_list)
  records_array = []
  records = []
  records_payload_length = 0
  data_list.each{|data_to_put|
    payload = data_to_put[:data]
    if records.length >= PUT_RECORD_BATCH_MAX_COUNT or (records_payload_length + payload.length) >= PUT_RECORD_BATCH_MAX_DATA_SIZE
      records_array.push(records)
      records = []
      records_payload_length = 0
    end
    records.push(data_to_put)
    records_payload_length += payload.length
  }
  records_array.push(records) unless records.empty?
  records_array
end
calculate_sleep_duration(current_retry) click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 260
def calculate_sleep_duration(current_retry)
  Array.new(@retries_on_putrecordbatch){|n| ((2 ** n) * scaling_factor)}[current_retry]
end
check_connection_to_stream() click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 174
def check_connection_to_stream
  @client.describe_delivery_stream(delivery_stream_name: @delivery_stream_name)
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_firehose.rb, line 63
def configure(conf)
  super

  if @detach_process or (@num_threads > 1)
    @parallel_mode = true
    if @detach_process
      @use_detach_multi_process_mixin = true
    end
  else
    @parallel_mode = false
  end

  if @parallel_mode
    if @order_events
      log.warn 'You have set "order_events" to true, however this configuration will be ignored due to "detach_process" and/or "num_threads".'
    end
    @order_events = false
  end

  @dump_class = @use_yajl ? Yajl : MultiJson
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 95
def format(tag, time, record)
  data = {
    data: @dump_class.dump(record) + if @append_newline then "\n" else "" end
  }

  data.to_msgpack
end
load_client() click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 125
def load_client
  user_agent_suffix = "#{USER_AGENT_NAME}/#{FluentPluginFirehose::VERSION}"

  options = {
    user_agent_suffix: user_agent_suffix
  }

  if @region
    options[:region] = @region
  end

  if @aws_key_id && @aws_sec_key
    options.update(
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
    )
  elsif @profile
    credentials_opts = {:profile_name => @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    options[:credentials] = credentials
  elsif @role_arn
    credentials = Aws::AssumeRoleCredentials.new(
      client: Aws::STS::Client.new(options),
      role_arn: @role_arn,
      role_session_name: "aws-fluent-plugin-firehose",
      external_id: @external_id,
      duration_seconds: 60 * 60
    )
    options[:credentials] = credentials
  end

  if @debug
    options.update(
      logger: Logger.new(log.out),
      log_level: :debug
    )
    # XXX: Add the following options, if necessary
    # :http_wire_trace => true
  end

  if @http_proxy
    options[:http_proxy] = @http_proxy
  end

  @client = Aws::Firehose::Client.new(options)

end
put_record_batch_with_retry(records, retry_count=0) click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 220
def put_record_batch_with_retry(records, retry_count=0)
  response = @client.put_record_batch(
      records: records,
      delivery_stream_name: @delivery_stream_name
  )
  log.info sprintf('Put record batch, Sent %d records and failed %d records',
             records.length,
             response[:failed_put_count].nil? ? 0 : response[:failed_put_count]
  )

  if response[:failed_put_count] && response[:failed_put_count] > 0
    failed_records = []
    response[:request_responses].each_with_index{|record,index|
      if record[:error_code]
        failed_records.push({error_code: record[:error_code], error_message: record[:error_message], body: records[index]})
      end
    }
    
    if(retry_count < @retries_on_putrecordbatch)
      failed_records.each{|record|
            log.info sprintf('Put record attempt failed, error_code: %s, error_message: %s, record: %s',
                                    record[:error_code], record[:error_message], @dump_class.dump(record[:body]))
              }
      
      sleep(calculate_sleep_duration(retry_count)) 
          retry_count += 1
      log.warn sprintf('Retrying to put %d records, Retry count: %d', failed_records.length, retry_count)
      put_record_batch_with_retry(
        failed_records.map{|record| record[:body]},
        retry_count
      )
    else
      failed_records.each{|record|
            log.error sprintf('Could not put record, error_code: %s, error_message: %s, record: %s',
                                    record[:error_code], record[:error_message], @dump_class.dump(record[:body]))
      }
    end
  end
end
put_record_for_order_events(data_list) click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 186
def put_record_for_order_events(data_list)
  sequence_number_for_ordering = nil
  data_list.each do |data_to_put|
    if sequence_number_for_ordering
      data_to_put.update(
        sequence_number_for_ordering: sequence_number_for_ordering
      )
    end
    data_to_put.update(
        delivery_stream_name: @delivery_stream_name
    )
    result = @client.put_record(data_to_put)
    sequence_number_for_ordering = result[:sequence_number]
  end
end
record_exceeds_max_size?(record_string) click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 268
def record_exceeds_max_size?(record_string)
  return record_string.length > PUT_RECORD_MAX_DATA_SIZE
end
scaling_factor() click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 264
def scaling_factor
  0.5 + Kernel.rand * 0.1
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_firehose.rb, line 85
def start
  detach_multi_process do
    super
    load_client
    if @ensure_stream_connection
      check_connection_to_stream
    end
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 103
def write(chunk)
  data_list = chunk.to_enum(:msgpack_each).map{|record|
    build_data_to_put(record)
  }.find_all{|record|
    unless record_exceeds_max_size?(record[:data])
      true
    else
      log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record[:data])
      false
    end
  }

  if @order_events
    put_record_for_order_events(data_list)
  else
    records_array = build_records_array_to_put(data_list)
    records_array.each{|records|
      put_record_batch_with_retry(records)
    }
  end
end