class Fluent::HoneycombOutput

Public Instance Methods

configure(conf) click to toggle source

This method is called before starting. 'conf' is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.

Calls superclass method
# File lib/fluent/plugin/out_honeycomb.rb, line 27
def configure(conf)
  # Apply sane defaults. These override the poor fluentd defaults, but not
  # anything explicitly specified in the configuration.
  conf["buffer_chunk_limit"] ||= "500k"
  conf["flush_interval"] ||= "1s"
  conf["max_retry_wait"] ||= "30s"
  conf["retry_limit"] ||= 17
  super
end
flatten(record, prefix) click to toggle source
# File lib/fluent/plugin/out_honeycomb.rb, line 174
def flatten(record, prefix)
  ret = {}
  if record.is_a? Hash
    record.each { |key, value|
      ret.merge! flatten(value, "#{prefix}.#{key.to_s}")
    }
  else
    return {prefix => record}
  end
  ret
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_honeycomb.rb, line 49
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
parse_response(batch, resp) click to toggle source
# File lib/fluent/plugin/out_honeycomb.rb, line 133
def parse_response(batch, resp)
  if resp.status != 200
    log.error "Error sending batch: #{resp.status}, #{resp.body}"
    # Force retry by returning the entire batch
    return batch
  end

  begin
    results = JSON.parse(resp.body.to_s)
  rescue JSON::ParserError => e
    log.warn "Error parsing response as JSON: #{e}"
    raise e
  end
  successes = 0
  failures = []
  if !results.is_a? Array
    log.warning "Unexpected response format: #{results}"
    raise Exception.new("Unexpected response format: #{resp.status}")
  end

  results.each_with_index do |result, idx|
    if !result.is_a? Hash
      log.warning "Unexpected status format in response: #{result}"
      next
    end

    if result["status"] == 202
      successes += 1
    else
      failures.push(batch[idx])
    end
  end

  if failures.size > 0
    log.warn "Errors publishing records: #{failures.size} failures out of #{successes + failures.size}"
  else
    log.debug "Successfully published #{successes} records"
  end
  return failures
end
publish_batch(dataset, batch, retry_count) click to toggle source
# File lib/fluent/plugin/out_honeycomb.rb, line 112
def publish_batch(dataset, batch, retry_count)
  if batch.length == 0
    return
  end
  log.info "publishing #{batch.length} records to dataset #{dataset}"
  body = JSON.dump(batch)
  resp = HTTP.headers(
      "User-Agent" => "fluent-plugin-honeycomb/#{HONEYCOMB_PLUGIN_VERSION}",
      "Content-Type" => "application/json",
      "X-Honeycomb-Team" => @writekey)
      .post(URI.join(@api_host, "/1/batch/#{dataset}"), {
          :body => body,
      })
  failures = parse_response(batch, resp)
  if failures.size > 0 && retry_count < @retry_limit
    # sleep and retry with the set of failed events
    sleep 1
    publish_batch(dataset, failures, retry_count + 1)
  end
end
shutdown() click to toggle source

This method is called when shutting down. Shutdown the thread and close sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_honeycomb.rb, line 45
def shutdown
  super
end
start() click to toggle source

This method is called when starting. Open sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_honeycomb.rb, line 39
def start
  super
end
write(chunk) click to toggle source

This method is called every flush interval. Write the buffer chunk to files or databases here. 'chunk' is a buffer chunk that includes multiple formatted events. You can use 'data = chunk.read' to get all events and 'chunk.open {|io| … }' to get IO objects.

NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins.

# File lib/fluent/plugin/out_honeycomb.rb, line 60
def write(chunk)
  batches =  Hash.new{ |h, k| h[k] = [] }
  chunk.msgpack_each do |(tag, time, record)|
    if !record.is_a? Hash
      log.debug "Skipping record #{record}"
      next
    end

    if @presampled_key && record.include?(@presampled_key)
      sample_rate = record.delete(@presampled_key)

      if !sample_rate.is_a?(Integer) || sample_rate < 1
        log.warn "Record emitted a presampled key (#{@presampled_key} = #{sample_rate}), but was not a valid sample rate #{record}"

        sample_rate = 1
      end
    else
      sample_rate = @sample_rate

      if @sample_rate > 1 && rand(1..@sample_rate) > 1
        next
      end
    end

    if @include_tag_key
      record[@tag_key] = tag
    end
    @flatten_keys.each do |k|
      next unless record[k].is_a?(Hash)
      record.merge!(flatten(record[k], k))
      record.delete(k)
    end

    if (@dataset_from_key != "" && record.has_key?(@dataset_from_key))
      dataset = record[@dataset_from_key]
      record.delete @dataset_from_key
    else
      dataset = @dataset
    end
    batch = batches[dataset]
    batch.push({
        "data" => record,
        "samplerate" => sample_rate,
        "time" => Time.at(time).utc.to_datetime.rfc3339
    })
  end

  batches.each do |dataset, batch|
    publish_batch(dataset, batch, 0)
  end
end