class Fluent::HoneycombOutput
Public Instance Methods
configure(conf)
click to toggle source
This method is called before starting. 'conf' is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
Calls superclass method
# File lib/fluent/plugin/out_honeycomb.rb, line 27 def configure(conf) # Apply sane defaults. These override the poor fluentd defaults, but not # anything explicitly specified in the configuration. conf["buffer_chunk_limit"] ||= "500k" conf["flush_interval"] ||= "1s" conf["max_retry_wait"] ||= "30s" conf["retry_limit"] ||= 17 super end
flatten(record, prefix)
click to toggle source
# File lib/fluent/plugin/out_honeycomb.rb, line 174 def flatten(record, prefix) ret = {} if record.is_a? Hash record.each { |key, value| ret.merge! flatten(value, "#{prefix}.#{key.to_s}") } else return {prefix => record} end ret end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_honeycomb.rb, line 49 def format(tag, time, record) [tag, time, record].to_msgpack end
parse_response(batch, resp)
click to toggle source
# File lib/fluent/plugin/out_honeycomb.rb, line 133 def parse_response(batch, resp) if resp.status != 200 log.error "Error sending batch: #{resp.status}, #{resp.body}" # Force retry by returning the entire batch return batch end begin results = JSON.parse(resp.body.to_s) rescue JSON::ParserError => e log.warn "Error parsing response as JSON: #{e}" raise e end successes = 0 failures = [] if !results.is_a? Array log.warning "Unexpected response format: #{results}" raise Exception.new("Unexpected response format: #{resp.status}") end results.each_with_index do |result, idx| if !result.is_a? Hash log.warning "Unexpected status format in response: #{result}" next end if result["status"] == 202 successes += 1 else failures.push(batch[idx]) end end if failures.size > 0 log.warn "Errors publishing records: #{failures.size} failures out of #{successes + failures.size}" else log.debug "Successfully published #{successes} records" end return failures end
publish_batch(dataset, batch, retry_count)
click to toggle source
# File lib/fluent/plugin/out_honeycomb.rb, line 112 def publish_batch(dataset, batch, retry_count) if batch.length == 0 return end log.info "publishing #{batch.length} records to dataset #{dataset}" body = JSON.dump(batch) resp = HTTP.headers( "User-Agent" => "fluent-plugin-honeycomb/#{HONEYCOMB_PLUGIN_VERSION}", "Content-Type" => "application/json", "X-Honeycomb-Team" => @writekey) .post(URI.join(@api_host, "/1/batch/#{dataset}"), { :body => body, }) failures = parse_response(batch, resp) if failures.size > 0 && retry_count < @retry_limit # sleep and retry with the set of failed events sleep 1 publish_batch(dataset, failures, retry_count + 1) end end
shutdown()
click to toggle source
This method is called when shutting down. Shutdown the thread and close sockets or files here.
Calls superclass method
# File lib/fluent/plugin/out_honeycomb.rb, line 45 def shutdown super end
start()
click to toggle source
This method is called when starting. Open sockets or files here.
Calls superclass method
# File lib/fluent/plugin/out_honeycomb.rb, line 39 def start super end
write(chunk)
click to toggle source
This method is called every flush interval. Write the buffer chunk to files or databases here. 'chunk' is a buffer chunk that includes multiple formatted events. You can use 'data = chunk.read' to get all events and 'chunk.open {|io| … }' to get IO objects.
NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins.
# File lib/fluent/plugin/out_honeycomb.rb, line 60 def write(chunk) batches = Hash.new{ |h, k| h[k] = [] } chunk.msgpack_each do |(tag, time, record)| if !record.is_a? Hash log.debug "Skipping record #{record}" next end if @presampled_key && record.include?(@presampled_key) sample_rate = record.delete(@presampled_key) if !sample_rate.is_a?(Integer) || sample_rate < 1 log.warn "Record emitted a presampled key (#{@presampled_key} = #{sample_rate}), but was not a valid sample rate #{record}" sample_rate = 1 end else sample_rate = @sample_rate if @sample_rate > 1 && rand(1..@sample_rate) > 1 next end end if @include_tag_key record[@tag_key] = tag end @flatten_keys.each do |k| next unless record[k].is_a?(Hash) record.merge!(flatten(record[k], k)) record.delete(k) end if (@dataset_from_key != "" && record.has_key?(@dataset_from_key)) dataset = record[@dataset_from_key] record.delete @dataset_from_key else dataset = @dataset end batch = batches[dataset] batch.push({ "data" => record, "samplerate" => sample_rate, "time" => Time.at(time).utc.to_datetime.rfc3339 }) end batches.each do |dataset, batch| publish_batch(dataset, batch, 0) end end