class WavefrontCli::Write
Send points via any method supported by the SDK
Constants
- SPLIT_PATTERN
Attributes
Public Instance Methods
I chose to prioritise UI consistency over internal elegance here. The `write` command doesn't follow the age-old assumption that each command maps 1:1 to a similarly named SDK class. Write
can use `write` or `distribution`.
# File lib/wavefront-cli/write.rb, line 94 def _sdk_class return 'Wavefront::Distribution' if distribution? 'Wavefront::Write' end
A wrapper which lets us send normal points, deltas, or distributions
# File lib/wavefront-cli/write.rb, line 180 def call_write(data, openclose = true) if options[:delta] wf.write_delta(data, openclose) else wf.write(data, openclose) end end
# File lib/wavefront-cli/write.rb, line 143 def close_connection wf.close end
# File lib/wavefront-cli/write.rb, line 114 def default_port distribution? ? 40_000 : 2878 end
# File lib/wavefront-cli/write.rb, line 100 def distribution? return true if options[:distribution] options[:infileformat]&.include?('d') end
# File lib/wavefront-cli/write.rb, line 36 def do_distribution send_point(make_distribution_point(tags_to_hash(options[:tag]))) end
rubocop:enable Metrics/AbcSize
# File lib/wavefront-cli/write.rb, line 30 def do_file valid_format?(options[:infileformat]) setup_fmt(options[:infileformat] || 'tmv') process_input(options[:'<file>']) end
# File lib/wavefront-cli/write.rb, line 40 def do_noise loop do do_point(random_value(options[:min] || -10, options[:max] || 10)) sleep(sleep_time) end end
rubocop:disable Metrics/AbcSize
# File lib/wavefront-cli/write.rb, line 17 def do_point(value = options[:'<value>']) tags = tags_to_hash(options[:tag]) p = { path: options[:'<metric>'], value: sane_value(value) } p[:tags] = tags unless tags.empty? p[:source] = options[:host] if options[:host] p[:ts] = parse_time(options[:time]) if options[:time] send_point(p) end
Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.
@param line [String] input line @return [True] if the number of fields is correct @raise WavefrontCli::Exception::UnparseableInput
if there
are not the right number of fields.
# File lib/wavefront-cli/write.rb, line 409 def enough_fields?(line) ncols = line.split(SPLIT_PATTERN).length return true if fmt.include?('T') && ncols >= fmt.length return true if ncols == fmt.length raise(WavefrontCli::Exception::UnparseableInput, format('Expected %<expected>s fields, got %<got>s', expected: fmt.length, got: ncols)) end
We will let users write a distribution as '1 1 1' or '3x1' or even a mix of the two
# File lib/wavefront-cli/write.rb, line 223 def expand_dist(dist) dist.map do |v| if v.is_a?(String) && v.include?('x') x, val = v.split('x', 2) Array.new(x.to_i, val.to_f) else v.to_f end end.flatten end
# File lib/wavefront-cli/write.rb, line 85 def extra_options options[:using] ? { writer: options[:using] } : {} end
Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.
param chunks [Array] a chunked line of input from process_line
return [String] the metric path raise TypeError if field does not exist
# File lib/wavefront-cli/write.rb, line 264 def extract_path(chunks) m = chunks[fmt.index('m')] options[:metric] ? [options[:metric], m].join('.') : m rescue TypeError return options[:metric] if options[:metric] raise end
Find and return the source in a chunked line of input.
param chunks [Array] a chunked line of input from process_line
return [String] the source, if it is there, or if not, the
value passed through by -H, or the local hostname.
# File lib/wavefront-cli/write.rb, line 279 def extract_source(chunks) chunks[fmt.index('s')] rescue TypeError options[:source] || Socket.gethostname end
Find and return the source in a chunked line of input.
@param chunks [Array] a chunked line of input from process_line
@return [Float] the timestamp, if it is there, or the current
UTC time if it is not.
# File lib/wavefront-cli/write.rb, line 240 def extract_ts(chunks) ts = chunks[fmt.index('t')] return parse_time(ts) if valid_timestamp?(ts) rescue TypeError Time.now.utc.to_i end
Find and return the value in a chunked line of input
param chunks [Array] a chunked line of input from process_line
return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it's not a value
# File lib/wavefront-cli/write.rb, line 209 def extract_value(chunks) if fmt.include?('v') v = chunks[fmt.index('v')] v.to_f else raw = chunks[fmt.index('d')].split(',') xpanded = expand_dist(raw) wf.mk_distribution(xpanded) end end
# File lib/wavefront-cli/write.rb, line 370 def format_string_does_not_have_v_and_d?(fmt) return true unless fmt.include?('v') && fmt.include?('d') raise(WavefrontCli::Exception::UnparseableInput, "'v' and 'd' are mutually exclusive") end
# File lib/wavefront-cli/write.rb, line 391 def format_string_has_big_t_only_at_the_end?(fmt) return true unless fmt.include?('T') return true if fmt.end_with?('T') raise(WavefrontCli::Exception::UnparseableInput, "if used, 'T' must come at end of format string") end
# File lib/wavefront-cli/write.rb, line 384 def format_string_has_unique_chars?(fmt) return true if fmt.chars.sort == fmt.chars.uniq.sort raise(WavefrontCli::Exception::UnparseableInput, 'repeated field in format string') end
# File lib/wavefront-cli/write.rb, line 363 def format_string_has_v_or_d?(fmt) return true if fmt.include?('v') || fmt.include?('d') raise(WavefrontCli::Exception::UnparseableInput, "format string must include 'v' or 'd'") end
# File lib/wavefront-cli/write.rb, line 377 def format_string_is_all_valid_chars?(fmt) return true if /^[dmstTv]+$/.match?(fmt) raise(WavefrontCli::Exception::UnparseableInput, 'unsupported field in format string') end
# File lib/wavefront-cli/write.rb, line 438 def load_data(file) IO.read(file) rescue StandardError raise WavefrontCli::Exception::FileNotFound end
rubocop:disable Metrics/AbcSize
# File lib/wavefront-cli/write.rb, line 66 def make_distribution_point(tags) { path: options[:'<metric>'], interval: options[:interval] || 'M', tags: tags, value: mk_dist }.tap do |p| p[:source] = options[:host] if options[:host] p[:ts] = parse_time(options[:time]) if options[:time] end end
# File lib/wavefront-cli/write.rb, line 106 def mk_creds { proxy: options[:proxy], port: options[:port] || default_port, socket: options[:socket], endpoint: options[:endpoint], token: options[:token] } end
Turn our user's representation of a distribution into one which suits Wavefront. The SDK can do this for us.
# File lib/wavefront-cli/write.rb, line 80 def mk_dist xpanded = expand_dist(options[:'<val>']) wf.mk_distribution(xpanded.map(&:to_f)) end
# File lib/wavefront-cli/write.rb, line 139 def open_connection wf.open end
Read the input, from a file or from STDIN, and turn each line into Wavefront points.
# File lib/wavefront-cli/write.rb, line 156 def process_input(file) if file == '-' read_stdin else call_write( process_input_file(load_data(Pathname.new(file)).split("\n")) ) end end
@param data [Array] array of lines
# File lib/wavefront-cli/write.rb, line 168 def process_input_file(data) data.each_with_object([]) do |l, a| a.<< process_line(l) rescue WavefrontCli::Exception::UnparseableInput => e puts "Bad input. #{e.message}." next end end
Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.
We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.
@param line [String] a line of an input file @return [Hash] @raise WavefrontCli::Exception::UnparseableInput
if the line
doesn't look right
rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize
# File lib/wavefront-cli/write.rb, line 299 def process_line(line) return true if line.empty? chunks = line.split(SPLIT_PATTERN, fmt.length) enough_fields?(line) # can raise exception point = { path: extract_path(chunks), value: extract_value(chunks) } tags = line_tags(chunks) point.tap do |p| p[:tags] = tags unless tags.empty? p[:ts] = extract_ts(chunks) if fmt.include?('t') p[:source] = extract_source(chunks) if fmt.include?('s') p[:interval] = options[:interval] || 'm' if fmt.include?('d') end end
# File lib/wavefront-cli/write.rb, line 47 def random_value(min, max) return min if min == max rand(max.to_f - min.to_f) + min.to_f end
Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.
# File lib/wavefront-cli/write.rb, line 192 def read_stdin open_connection $stdin.each_line { |l| call_write(process_line(l.strip), false) } close_connection rescue SystemExit, Interrupt puts 'ctrl-c. Exiting.' wf.close exit 0 end
# File lib/wavefront-cli/write.rb, line 53 def sane_value(value) return value if value.is_a?(Numeric) raise WavefrontCli::Exception::InvalidValue unless value.is_a?(String) value.delete('\\').to_f end
# File lib/wavefront-cli/write.rb, line 147 def send_point(point) call_write(point) rescue Wavefront::Exception::InvalidEndpoint abort format("Could not connect to proxy '%<proxy>s:%<port>s'.", wf.creds) end
# File lib/wavefront-cli/write.rb, line 434 def setup_fmt(fmt) @fmt = fmt.split('') end
# File lib/wavefront-cli/write.rb, line 61 def sleep_time options[:interval] ? options[:interval].to_f : 1 end
The format string must contain values. They can be single values or distributions. So we must have 'v' xor 'd'. It must not contain anything other than 'm', 't', 'T', 's', 'd', or 'v', and the 'T', if there, must be at the end. No letter must appear more than once.
@param fmt [String] format of input file
# File lib/wavefront-cli/write.rb, line 355 def valid_format?(fmt) format_string_has_v_or_d?(fmt) format_string_does_not_have_v_and_d?(fmt) format_string_is_all_valid_chars?(fmt) format_string_has_unique_chars?(fmt) format_string_has_big_t_only_at_the_end?(fmt) end
Although the SDK does value checking, we'll add another layer of input checking here. See if the time looks valid. We'll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere. @param timestamp [String, Integer] epoch timestamp @return [Bool]
# File lib/wavefront-cli/write.rb, line 427 def valid_timestamp?(timestamp) (timestamp.is_a?(Integer) || timestamp.is_a?(String) && timestamp.match(/^\d+$/)) && timestamp.to_i > 946_684_800 && timestamp.to_i < (Time.now.to_i + 31_557_600) end
# File lib/wavefront-cli/write.rb, line 118 def validate_opts validate_opts_file if options[:file] if options[:using] == 'unix' return true if options[:socket] raise(WavefrontCli::Exception::CredentialError, 'No socket path.') end return true if options[:proxy] raise(WavefrontCli::Exception::CredentialError, 'No proxy address.') end
# File lib/wavefront-cli/write.rb, line 132 def validate_opts_file return true if options[:metric] || options[:infileformat]&.include?('m') raise(WavefrontCli::Exception::InsufficientData, "Supply a metric path in the file or with '-m'.") end