class Spectator::Publisher
Internal class used to publish measurements to an aggregator service
Constants
- ADD_OP
- MAX_OP
- OPS
- UNKNOWN_OP
Public Class Methods
new(registry)
click to toggle source
# File lib/spectator/registry.rb, line 116 def initialize(registry) @registry = registry @started = false @should_stop = false @frequency = registry.config[:frequency] || 5 @http = Http.new(registry) end
Public Instance Methods
append_measurement(payload, table, measure)
click to toggle source
Add a measurement to our payload table. The serialization for a measurement is:
- length of tags - indexes for the tags based on the string table - operation (add (0), max (10)) - floating point value
# File lib/spectator/registry.rb, line 232 def append_measurement(payload, table, measure) op = op_for_measurement(measure) common_tags = @registry.common_tags tags = measure.id.tags len = tags.length + 1 + common_tags.length payload.push(len) common_tags.each do |k, v| payload.push(table[k]) payload.push(table[v]) end tags.each do |k, v| payload.push(table[k]) payload.push(table[v]) end payload.push(table[:name]) payload.push(table[measure.id.name]) payload.push(op) payload.push(measure.value) end
build_string_table(measurements)
click to toggle source
Build a string table from the list of measurements Unique words are identified, and assigned a number starting from 0 based on their lexicographical order
# File lib/spectator/registry.rb, line 204 def build_string_table(measurements) common_tags = @registry.common_tags table = {} common_tags.each do |k, v| table[k] = 0 table[v] = 0 end table[:name] = 0 measurements.each do |m| table[m.id.name] = 0 m.id.tags.each do |k, v| table[k] = 0 table[v] = 0 end end keys = table.keys.sort keys.each_with_index do |str, index| table[str] = index end table end
op_for_measurement(measure)
click to toggle source
payload_for_measurements(measurements)
click to toggle source
Generate a payload from the list of measurements The payload is an array, with the number of elements in the string table The string table, and measurements
# File lib/spectator/registry.rb, line 255 def payload_for_measurements(measurements) table = build_string_table(measurements) payload = [] payload.push(table.length) strings = table.keys.sort payload.concat(strings) measurements.each { |m| append_measurement(payload, table, m) } payload end
publish()
click to toggle source
Publish loop:
send measurements to the aggregator endpoint ':uri', every ':frequency' seconds
# File lib/spectator/registry.rb, line 286 def publish clock = @registry.clock until @should_stop start = clock.wall_time Spectator.logger.info 'Publishing' send_metrics_now elapsed = clock.wall_time - start sleep @frequency - elapsed if elapsed < @frequency end Spectator.logger.info 'Stopping publishing thread' end
registry_measurements()
click to toggle source
Get a list of measurements that should be sent
# File lib/spectator/registry.rb, line 266 def registry_measurements @registry.measurements.select { |m| should_send(m) } end
send_metrics_now()
click to toggle source
Send the current measurements to our aggregator service
# File lib/spectator/registry.rb, line 271 def send_metrics_now ms = registry_measurements if ms.empty? Spectator.logger.debug 'No measurements to send' else payload = payload_for_measurements(ms) uri = @registry.config[:uri] Spectator.logger.info "Sending #{ms.length} measurements to #{uri}" @http.post_json(uri, payload) end end
should_send(measure)
click to toggle source
Gauges are sent if they have a value Counters if they have a number of increments greater than 0
# File lib/spectator/registry.rb, line 193 def should_send(measure) op = op_for_measurement(measure) return measure.value > 0 if op == ADD_OP return !measure.value.nan? if op == MAX_OP false end
should_start?()
click to toggle source
# File lib/spectator/registry.rb, line 124 def should_start? if @started Spectator.logger.info('Ignoring start request. ' \ 'Spectator registry already started') return false end @started = true uri = @registry.config[:uri] if uri.nil? || uri.empty? Spectator.logger.info('Ignoring start request since Spectator ' \ 'registry has no valid uri') return false end true end
start()
click to toggle source
Start publishing if the config is acceptable:
uri is non-nil or empty
# File lib/spectator/registry.rb, line 144 def start return unless should_start? Spectator.logger.info 'Starting Spectator registry' @should_stop = false @publish_thread = Thread.new do publish end end
stop()
click to toggle source
Stop publishing measurements
# File lib/spectator/registry.rb, line 156 def stop unless @started Spectator.logger.info('Attemping to stop Spectator ' \ 'without a previous call to start') return end @should_stop = true Spectator.logger.info('Stopping spectator') @publish_thread.kill if @publish_thread @started = false Spectator.logger.info('Sending last batch of metrics before exiting') send_metrics_now end