class Wavefront::Write

This class helps you send points to Wavefront. It is extended by the Write and Report classes, which respectively handle point ingestion by a proxy and directly to the API.

Attributes

creds[R]
logger[R]
opts[R]
writer[R]

Public Class Methods

new(creds = {}, opts = {}) click to toggle source

Construct an object which gives the user an interface for writing points to Wavefront. The actual writing is handled by

a Wavefront::Writer

subclass.

@param creds [Hash] credentials: can contain keys:

proxy [String] the address of the Wavefront proxy. ('wavefront')
port [Integer] the port of the Wavefront proxy

@param options [Hash] can contain the following keys:

tags [Hash] point tags which will be applied to every point
noop [Bool] if true, no proxy connection will be made, and
  instead of sending the points, they will be printed in
  Wavefront wire format.
novalidate [Bool] if true, points will not be validated.
  This might make things go marginally quicker if you have
  done point validation higher up in the chain. Invalid
  points are dropped, logged, and reported in the summary.
verbose [Bool]
debug [Bool]
writer [Symbol, String] the name of the writer class to use.
  Defaults to :proxy
noauto [Bool] if this is false, #write will automatically
  open a connection to Wavefront on each invocation. Set
  this to true to manually manage the connection.
chunk_size [Integer] So as not to create unusable metric
  payloads, large batches of points will be broken into
  chunks. This property controls the number of metrics in a
  chunk. Defaults to 1,000.
chunk_pause [Integer] pause this many seconds between
  writing chunks. Defaults to zero.
# File lib/wavefront-sdk/write.rb, line 53
def initialize(creds = {}, opts = {})
  @opts = setup_options(opts, defaults)
  @creds = creds
  wf_point_tags?(opts[:tags]) if opts[:tags]
  @logger = Wavefront::Logger.new(opts)
  @writer = setup_writer
end

Public Instance Methods

chunk_size() click to toggle source
# File lib/wavefront-sdk/write.rb, line 207
def chunk_size
  opts[:chunk_size] || writer.chunk_size
end
close() click to toggle source

Wrapper to the writer class's close method.

# File lib/wavefront-sdk/write.rb, line 87
def close
  writer.close
end
composite_response(responses) click to toggle source

Compound the responses of all chunked writes into one. It will be 'ok' only if everything passed. Returns 400 as the HTTP code on error, regardless of what actual errors occurred. @param responses [Array] @return Wavefront::Response

# File lib/wavefront-sdk/write.rb, line 115
def composite_response(responses)
  result, code = response_results(responses)

  summary = { sent: 0, rejected: 0, unsent: 0 }

  %i[sent rejected unsent].each do |k|
    summary[k] = responses.sum { |r| r.response[k] }
  end

  Wavefront::Response.new(
    { status: { result: result, message: nil, code: code },
      response: summary.to_h }.to_json, nil
  )
end
data_format() click to toggle source
# File lib/wavefront-sdk/write.rb, line 241
def data_format
  :wavefront
end
defaults() click to toggle source

Chunk size gets overriden

# File lib/wavefront-sdk/write.rb, line 63
def defaults
  { tags: nil,
    writer: :proxy,
    noop: false,
    novalidate: false,
    noauto: false,
    verbose: false,
    debug: false,
    chunk_pause: 0 }
end
hash_to_wf(point) click to toggle source

Convert a validated point to a string conforming to community.wavefront.com/docs/DOC-1031. No validation is done here.

@param point [Hash] a hash describing a point. See write() for

the format.
# File lib/wavefront-sdk/write.rb, line 218
def hash_to_wf(point)
  raise Wavefront::Exception::InvalidMetricName unless point[:path]
  raise Wavefront::Exception::InvalidMetricValue unless point[:value]

  format('%<path>s %<value>s %<ts>s source=%<source>s %<tags>s %<opttags>s',
         point_hash(point)).squeeze(' ').strip
end
manage_conn() click to toggle source
# File lib/wavefront-sdk/write.rb, line 138
def manage_conn
  opts[:noauto] ? false : true
end
open() click to toggle source

Wrapper to the writer class's open method. Using this you can manually open a connection and re-use it.

# File lib/wavefront-sdk/write.rb, line 81
def open
  writer.open
end
paths_to_deltas(points) click to toggle source

Prefix all paths in a points array (as passed to write_delta() with a delta symbol

@param points [Array] see write() @return [Array]

# File lib/wavefront-sdk/write.rb, line 161
def paths_to_deltas(points)
  [points].flatten.map { |p| p.tap { p[:path] = DELTA + p[:path] } }
end
point_hash(point) click to toggle source
# File lib/wavefront-sdk/write.rb, line 226
def point_hash(point)
  point.dup.tap do |p|
    p[:ts] ||= nil
    p[:source] ||= HOSTNAME
    p[:tags] = tags_or_nothing(p.fetch(:tags, nil))
    p[:opttags] = tags_or_nothing(opts.fetch(:tags, nil))
  end
end
raw(points, openclose = manage_conn) click to toggle source

Send raw data to a Wavefront proxy, optionally automatically opening and closing the connection. (Or not, if that does not make sense in the context of the writer.)

@param points [Array] an array of points in native

Wavefront wire format, as described in
https://community.wavefront.com/docs/DOC-1031. No validation
is performed.

@param openclose [Boolean] whether or not to automatically

open a socket to the proxy before sending points, and
afterwards, close it.
# File lib/wavefront-sdk/write.rb, line 191
def raw(points, openclose = manage_conn)
  writer.open if openclose && writer.respond_to?(:open)

  begin
    [points].flatten.each { |p| writer.send_point(p) }
  ensure
    writer.close if openclose && writer.respond_to?(:close)
  end
end
response_results(responses) click to toggle source
# File lib/wavefront-sdk/write.rb, line 130
def response_results(responses)
  if responses.all?(&:ok?)
    ['OK', 200]
  else
    ['ERROR', 400]
  end
end
send_point(point) click to toggle source

Wrapper for the writer class's send_point method @param point [String] a point description, probably from

#hash_to_wf()
# File lib/wavefront-sdk/write.rb, line 169
def send_point(point)
  if opts[:noop]
    logger.log "Would send: #{point}"
    return
  end

  logger.log("Sending: #{point}", :debug)
  writer.send_point(point)
end
setup_options(user, defaults) click to toggle source
# File lib/wavefront-sdk/write.rb, line 74
def setup_options(user, defaults)
  defaults.merge(user)
end
tags_or_nothing(tags) click to toggle source
# File lib/wavefront-sdk/write.rb, line 235
def tags_or_nothing(tags)
  return nil unless tags

  tags.to_wf_tag
end
validation() click to toggle source

The method used to validate the data we wish to write.

# File lib/wavefront-sdk/write.rb, line 203
def validation
  :wf_point?
end
write(points = [], openclose = manage_conn, prefix = nil) click to toggle source

A wrapper to the writer class's write method. Writers implement this method differently, Check the appropriate class documentation for @return information etc. The signature is always the same. @return [Boolean] false should any chunk fail @raise any exceptions raised by the writer classes are passed

through
# File lib/wavefront-sdk/write.rb, line 99
def write(points = [], openclose = manage_conn, prefix = nil)
  resps = [points].flatten.each_slice(chunk_size).map do |chunk|
    resp = writer.write(chunk, openclose, prefix)
    sleep(opts[:chunk_pause])
    resp
  end

  composite_response(resps)
end
write_delta(points, openclose = manage_conn) click to toggle source

A wrapper method around write() which guarantees all points will be sent as deltas. You can still manually prefix any metric with a delta symbol and use write(), but depending on your use-case, this method may be safer. It's easy to forget the delta.

@param points [Array] see write() @param openclose [Bool] see write()

# File lib/wavefront-sdk/write.rb, line 151
def write_delta(points, openclose = manage_conn)
  write(paths_to_deltas(points), openclose)
end

Private Instance Methods

setup_writer() click to toggle source

@return [Object] appropriate subclass of Wavefront::Writer @raise [Wavefront::Exception::UnsupportedWriter] if requested

writer cannot be loaded
# File lib/wavefront-sdk/write.rb, line 251
def setup_writer
  writer = opts[:writer].to_s
  require_relative File.join('writers', writer)
  Object.const_get(format('Wavefront::Writer::%<writer_class>s',
                          writer_class: writer.capitalize)).new(self)
rescue LoadError
  raise(Wavefront::Exception::UnsupportedWriter, writer)
end