class FluentPluginFirehose::FirehoseBufferedOutput
Constants
- PUT_RECORD_BATCH_MAX_COUNT
- PUT_RECORD_BATCH_MAX_DATA_SIZE
- PUT_RECORD_MAX_DATA_SIZE
- USER_AGENT_NAME
Public Instance Methods
build_data_to_put(data)
click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 178 def build_data_to_put(data) if @zlib_compression Hash[data.map{|k, v| [k.to_sym, k=="data" ? Zlib::Deflate.deflate(v) : v] }] else Hash[data.map{|k, v| [k.to_sym, v] }] end end
build_records_array_to_put(data_list)
click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 202 def build_records_array_to_put(data_list) records_array = [] records = [] records_payload_length = 0 data_list.each{|data_to_put| payload = data_to_put[:data] if records.length >= PUT_RECORD_BATCH_MAX_COUNT or (records_payload_length + payload.length) >= PUT_RECORD_BATCH_MAX_DATA_SIZE records_array.push(records) records = [] records_payload_length = 0 end records.push(data_to_put) records_payload_length += payload.length } records_array.push(records) unless records.empty? records_array end
calculate_sleep_duration(current_retry)
click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 260 def calculate_sleep_duration(current_retry) Array.new(@retries_on_putrecordbatch){|n| ((2 ** n) * scaling_factor)}[current_retry] end
check_connection_to_stream()
click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 174 def check_connection_to_stream @client.describe_delivery_stream(delivery_stream_name: @delivery_stream_name) end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_firehose.rb, line 63 def configure(conf) super if @detach_process or (@num_threads > 1) @parallel_mode = true if @detach_process @use_detach_multi_process_mixin = true end else @parallel_mode = false end if @parallel_mode if @order_events log.warn 'You have set "order_events" to true, however this configuration will be ignored due to "detach_process" and/or "num_threads".' end @order_events = false end @dump_class = @use_yajl ? Yajl : MultiJson end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 95 def format(tag, time, record) data = { data: @dump_class.dump(record) + if @append_newline then "\n" else "" end } data.to_msgpack end
load_client()
click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 125 def load_client user_agent_suffix = "#{USER_AGENT_NAME}/#{FluentPluginFirehose::VERSION}" options = { user_agent_suffix: user_agent_suffix } if @region options[:region] = @region end if @aws_key_id && @aws_sec_key options.update( access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, ) elsif @profile credentials_opts = {:profile_name => @profile} credentials_opts[:path] = @credentials_path if @credentials_path credentials = Aws::SharedCredentials.new(credentials_opts) options[:credentials] = credentials elsif @role_arn credentials = Aws::AssumeRoleCredentials.new( client: Aws::STS::Client.new(options), role_arn: @role_arn, role_session_name: "aws-fluent-plugin-firehose", external_id: @external_id, duration_seconds: 60 * 60 ) options[:credentials] = credentials end if @debug options.update( logger: Logger.new(log.out), log_level: :debug ) # XXX: Add the following options, if necessary # :http_wire_trace => true end if @http_proxy options[:http_proxy] = @http_proxy end @client = Aws::Firehose::Client.new(options) end
put_record_batch_with_retry(records, retry_count=0)
click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 220 def put_record_batch_with_retry(records, retry_count=0) response = @client.put_record_batch( records: records, delivery_stream_name: @delivery_stream_name ) log.info sprintf('Put record batch, Sent %d records and failed %d records', records.length, response[:failed_put_count].nil? ? 0 : response[:failed_put_count] ) if response[:failed_put_count] && response[:failed_put_count] > 0 failed_records = [] response[:request_responses].each_with_index{|record,index| if record[:error_code] failed_records.push({error_code: record[:error_code], error_message: record[:error_message], body: records[index]}) end } if(retry_count < @retries_on_putrecordbatch) failed_records.each{|record| log.info sprintf('Put record attempt failed, error_code: %s, error_message: %s, record: %s', record[:error_code], record[:error_message], @dump_class.dump(record[:body])) } sleep(calculate_sleep_duration(retry_count)) retry_count += 1 log.warn sprintf('Retrying to put %d records, Retry count: %d', failed_records.length, retry_count) put_record_batch_with_retry( failed_records.map{|record| record[:body]}, retry_count ) else failed_records.each{|record| log.error sprintf('Could not put record, error_code: %s, error_message: %s, record: %s', record[:error_code], record[:error_message], @dump_class.dump(record[:body])) } end end end
put_record_for_order_events(data_list)
click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 186 def put_record_for_order_events(data_list) sequence_number_for_ordering = nil data_list.each do |data_to_put| if sequence_number_for_ordering data_to_put.update( sequence_number_for_ordering: sequence_number_for_ordering ) end data_to_put.update( delivery_stream_name: @delivery_stream_name ) result = @client.put_record(data_to_put) sequence_number_for_ordering = result[:sequence_number] end end
record_exceeds_max_size?(record_string)
click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 268 def record_exceeds_max_size?(record_string) return record_string.length > PUT_RECORD_MAX_DATA_SIZE end
scaling_factor()
click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 264 def scaling_factor 0.5 + Kernel.rand * 0.1 end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_firehose.rb, line 85 def start detach_multi_process do super load_client if @ensure_stream_connection check_connection_to_stream end end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_firehose.rb, line 103 def write(chunk) data_list = chunk.to_enum(:msgpack_each).map{|record| build_data_to_put(record) }.find_all{|record| unless record_exceeds_max_size?(record[:data]) true else log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record[:data]) false end } if @order_events put_record_for_order_events(data_list) else records_array = build_records_array_to_put(data_list) records_array.each{|records| put_record_batch_with_retry(records) } end end