class LogStash::Outputs::Graphite

This output allows you to pull metrics from your logs and ship them to Graphite. Graphite is an open source tool for storing and graphing metrics.

An example use case: Some applications emit aggregated stats in the logs every 10 seconds. Using the grok filter and this output, it is possible to capture the metric values from the logs and emit them to Graphite.

Constants

DEFAULT_METRICS_FORMAT
EXCLUDE_ALWAYS
METRIC_PLACEHOLDER

Public Instance Methods

construct_metric_name(event, metric) click to toggle source
# File lib/logstash/outputs/graphite.rb, line 134
def construct_metric_name(event, metric)
  if @metrics_format
    sprinted = event.sprintf(@metrics_format)
    return sprinted.gsub(METRIC_PLACEHOLDER, metric)
  end

  metric
end
receive(event) click to toggle source
# File lib/logstash/outputs/graphite.rb, line 143
def receive(event)
  # Graphite message format: metric value timestamp\n

  # compact to remove nil messages which produces useless \n
  messages = (
    @fields_are_metrics \
      ? messages_from_event_fields(event, @include_metrics, @exclude_metrics)
      : messages_from_event_metrics(event, @metrics)
  ).compact

  if messages.empty?
    @logger.debug? && @logger.debug("Message is empty, not sending anything to Graphite", :messages => messages)
  else
    message = messages.join("\n")
    @logger.debug? && @logger.debug("Sending carbon messages", :messages => messages)

    send(message)
  end
end
register() click to toggle source
# File lib/logstash/outputs/graphite.rb, line 86
def register
  @include_metrics.collect!{|regexp| Regexp.new(regexp)}
  @exclude_metrics.collect!{|regexp| Regexp.new(regexp)}

  if @metrics_format && !@metrics_format.include?(METRIC_PLACEHOLDER)
    @logger.warn("metrics_format does not include placeholder #{METRIC_PLACEHOLDER} .. falling back to default format: #{DEFAULT_METRICS_FORMAT.inspect}")

    @metrics_format = DEFAULT_METRICS_FORMAT
  end

  setup_hosts
end
send(message) click to toggle source
# File lib/logstash/outputs/graphite.rb, line 105
def send(message)
  numattempts = 0
  hosts_to_try = @hosts
  host = hosts_to_try.sample
  hosts_to_try.delete(host)

  begin
    address, _, port = host.rpartition(":")
    @logger.debug? && @logger.debug("Trying to send metrics to", :address => address, :port => port)
    TCPSocket.new(address, port).puts(message)
  rescue Errno::ECOPNNREFUSED, Errno::EPIPE, Errno::ECONNRESET, IOError
    if hosts_to_try.size > 0
      host = hosts_to_try.sample
      hosts_to_try.delete(host)
      retry
    elsif @resend_on_failure && numattempts < @resend_attempts
      sleep(@reconnect_interval)
      hosts_to_try = @hosts
      host = hosts_to_try.sample
      hosts_to_try.delete(host)
      numattempts += 1
      retry
    else
      @logger.warn("No more hosts to try, skip sending...")
    end
  end

end
setup_hosts() click to toggle source
# File lib/logstash/outputs/graphite.rb, line 99
def setup_hosts
  if @hosts && @hosts.size == 1 && @hosts[0] == "localhost:2003"
    @hosts.replace(["%s:%s" % [@host, @port]])
  end
end

Private Instance Methods

dotify(hash, prefix = nil) click to toggle source

Take a nested ruby hash of the form {:a => {:b => 2}, c: => 3} and turn it into a hash of the form { “a.b” => 2, “c” => 3}

# File lib/logstash/outputs/graphite.rb, line 212
def dotify(hash, prefix = nil)
  hash.reduce({}) do |acc, kv|
    k, v = kv
    pk = prefix ? "#{prefix}#{@nested_object_separator}#{k}" : k.to_s
    if v.is_a?(Hash)
      acc.merge!(dotify(v, pk))
    elsif v.is_a?(Array)
      # There's no right answer here, so we do nothing
      @logger.warn("Array values not supported for graphite metrics! Ignoring #{hash} @ #{prefix}")
    else
      acc[pk] = v
    end
    acc
  end
end
event_timestamp(event) click to toggle source
# File lib/logstash/outputs/graphite.rb, line 191
def event_timestamp(event)
  event.get(@timestamp_field).to_i
end
messages_from_event_fields(event, include_metrics, exclude_metrics) click to toggle source
# File lib/logstash/outputs/graphite.rb, line 165
def messages_from_event_fields(event, include_metrics, exclude_metrics)
  @logger.debug? && @logger.debug("got metrics event", :metrics => event.to_hash)

  timestamp = event_timestamp(event)
  event.to_hash.flat_map do |metric,value|
    next if EXCLUDE_ALWAYS.include?(metric)
    next unless include_metrics.empty? || include_metrics.any? { |regexp| metric.match(regexp) }
    next if exclude_metrics.any? {|regexp| metric.match(regexp)}

    metrics_lines_for_event(event, metric, value, timestamp)
  end
end
messages_from_event_metrics(event, metrics) click to toggle source
# File lib/logstash/outputs/graphite.rb, line 178
def messages_from_event_metrics(event, metrics)
  timestamp = event_timestamp(event)
  metrics.flat_map do |metric, value|
    @logger.debug? && @logger.debug("processing", :metric => metric, :value => value)

    metric = event.sprintf(metric)
    next unless @include_metrics.any? {|regexp| metric.match(regexp)}
    next if @exclude_metrics.any? {|regexp| metric.match(regexp)}

    metrics_lines_for_event(event, metric, value, timestamp)
  end
end
metrics_line(event, name, value, timestamp) click to toggle source
# File lib/logstash/outputs/graphite.rb, line 205
def metrics_line(event, name, value, timestamp)
  "#{construct_metric_name(event, name)} #{value} #{timestamp}"
end
metrics_lines_for_event(event, metric, value, timestamp) click to toggle source
# File lib/logstash/outputs/graphite.rb, line 195
def metrics_lines_for_event(event, metric, value, timestamp)
  if event.get(metric).is_a?(Hash)
    dotify(event.get(metric), metric).map do |k, v|
      metrics_line(event, k, v, timestamp)
    end
  else
    metrics_line(event, event.sprintf(metric), event.sprintf(value).to_f, timestamp)
  end
end