class GraphiteAPI::Buffer

Constants

CHARS_TO_BE_IGNORED
END_OF_STREAM
VALID_MESSAGE

Public Class Methods

new(options) click to toggle source
# File lib/graphite-api/buffer.rb, line 30
def initialize options
  @options = options
  @queue = Queue.new
  @streamer = Hash.new {|h,k| h[k] = ""}
  @cache = Cache::Memory.new options if options[:cache]
end

Public Instance Methods

<<(obj)
Alias for: push
inspect() click to toggle source
# File lib/graphite-api/buffer.rb, line 89
def inspect
  "#<GraphiteAPI::Buffer:%s @quque#size=%s @streamer=%s>" % 
    [ object_id, queue.size, streamer]
end
new_records?() click to toggle source
# File lib/graphite-api/buffer.rb, line 85
def new_records?
  !queue.empty?
end
pull(format = nil) click to toggle source
# File lib/graphite-api/buffer.rb, line 65
def pull format = nil
  data = nested_zero_hash

  counter = 0
  while new_records?
    break if ( counter += 1 ) > 1_000_000 # TODO: fix this
    hash = queue.pop
    time = normalize_time(hash[:time],options[:slice])
    hash[:metric].each { |k,v| data[time][k] += v.to_f }
  end
  
  data.map do |time, hash|
    hash.map do |key, value|
      value = cache.incr(time,key,value) if cache
      results = ["#{prefix}#{key}",("%.2f"%value).to_f, time]
      format == :string ? results.join(" ") : results
    end
  end.flatten(1)
end
push(obj) click to toggle source

Add records to buffer push({:metric => {'a' => 10},:time => Time.now})

# File lib/graphite-api/buffer.rb, line 57
def push obj
  debug [:buffer,:add, obj]
  queue.push obj
  nil
end
Also aliased as: <<
stream(message, client_id = nil) click to toggle source

this method isn't thread safe use push for multiple threads support

# File lib/graphite-api/buffer.rb, line 41
def stream message, client_id = nil
  message.gsub(/\t/,' ').each_char do |char|
    next if invalid_char? char
    streamer[client_id] += char 
    
    if closed_stream? streamer[client_id]
      if valid_stream_message? streamer[client_id]
        push stream_message_to_obj streamer[client_id]
      end
      streamer.delete client_id
    end
  end
end

Private Instance Methods

closed_stream?(string) click to toggle source
# File lib/graphite-api/buffer.rb, line 105
def closed_stream? string
  string[-1,1] == END_OF_STREAM
end
invalid_char?(char) click to toggle source
# File lib/graphite-api/buffer.rb, line 101
def invalid_char? char
  CHARS_TO_BE_IGNORED.include? char
end
prefix() click to toggle source
# File lib/graphite-api/buffer.rb, line 113
def prefix
  @prefix ||= if options[:prefix] and !options[:prefix].empty?
    Array(options[:prefix]).join('.') << '.'
  else
    ""
  end
end
stream_message_to_obj(message) click to toggle source
# File lib/graphite-api/buffer.rb, line 96
def stream_message_to_obj message
  parts = message.split
  {:metric => { parts[0] => parts[1] },:time => Time.at(parts[2].to_i) }
end
valid_stream_message?(message) click to toggle source
# File lib/graphite-api/buffer.rb, line 109
def valid_stream_message? message
  message =~ VALID_MESSAGE
end