class LogStash::Outputs::Chronix

Public Instance Methods

almostEquals(delta, prevDelta) click to toggle source

checks if two offsets are almost equals

# File lib/logstash/outputs/chronix.rb, line 162
def almostEquals(delta, prevDelta)
  diff = (delta - prevDelta).abs

  return (diff <= @threshold)
end
connectToChronix() click to toggle source

open the connection to chronix

# File lib/logstash/outputs/chronix.rb, line 54
def connectToChronix
  @url = "http://" + @host + ":" + @port + @path
  @solr = RSolr.connect :url => @url
end
createChronixPoint(delta, value, type = "") click to toggle source
# File lib/logstash/outputs/chronix.rb, line 147
def createChronixPoint(delta, value, type = "")
  if type == "strace"
    return Chronix::StracePoint.new( :t => delta, :v => value )
  else
    return Chronix::Point.new( :t => delta, :v => value )
  end
end
createPointHash(events) click to toggle source

this method iterates through all events and creates a hash with different lists of points sorted by metric

# File lib/logstash/outputs/chronix.rb, line 81
def createPointHash(events)
  pointHash = Hash.new

  # add each event to our hash, sorted by metrics as key
  events.each do |event|

    eventData = event.to_hash()

    # format the timestamp to unix format
    timestamp = DateTime.iso8601("#{eventData["@timestamp"]}").to_time.to_i
    metric = eventData["metric"]

    # if there is no list for the current metric -> create a new one
    if pointHash[metric] == nil
      if eventData["chronix_type"] == "strace"
        pointHash[metric] = {"startTime" => timestamp, "lastTimestamp" => 0, "points" => Chronix::StracePoints.new, "prevDelta" => 0, "timeSinceLastDelta" => 0, "lastStoredDate" => timestamp}
      else
        pointHash[metric] = {"startTime" => timestamp, "lastTimestamp" => 0, "points" => Chronix::Points.new, "prevDelta" => 0, "timeSinceLastDelta" => 0, "lastStoredDate" => timestamp}
      end
    end

    if pointHash[metric]["lastTimestamp"] == 0
      delta = 0
    else
      delta = timestamp - pointHash[metric]["lastTimestamp"]
    end

    if (almostEquals(delta, pointHash[metric]["prevDelta"]) && noDrift(timestamp, pointHash[metric]["lastStoredDate"], pointHash[metric]["timeSinceLastDelta"]))
      # insert the current point in our list
      pointHash[metric]["points"].p << createChronixPoint(0, eventData["value"], eventData["chronix_type"])

      pointHash[metric]["timeSinceLastDelta"] += 1

    else
      # insert the current point in our list
      pointHash[metric]["points"].p << createChronixPoint(delta, eventData["value"], eventData["chronix_type"])

      pointHash[metric]["timeSinceLastDelta"] = 1
      pointHash[metric]["lastStoredDate"] = timestamp
    end

    # save current timestamp as lastTimestamp and the previousOffset
    pointHash[metric]["lastTimestamp"] = timestamp
    pointHash[metric]["prevDelta"] = delta

  end #end do

  return pointHash
end
createSolrDocument(metric, phash) click to toggle source
# File lib/logstash/outputs/chronix.rb, line 155
def createSolrDocument(metric, phash)
  endTime = phash["lastTimestamp"] # maybe use startTime + delta here?!
  # TODO add more meta-data
  return { :metric => metric, :start => phash["startTime"], :end => endTime, :data => zipAndEncode(phash["points"]) }
end
flush(events, close=false) click to toggle source
# File lib/logstash/outputs/chronix.rb, line 65
def flush(events, close=false)
  pointHash = createPointHash(events)

  documents = []

  # iterate through pointHash and create a solr document
  pointHash.each { |metric, phash|
    documents << createSolrDocument(metric, phash)
  }

  # send to chronix
  @solr.add documents
  @solr.update :data => '<commit/>'
end
noDrift(timestamp, lastStoredDate, timeSinceLastDelta) click to toggle source

checks if there is a drift

# File lib/logstash/outputs/chronix.rb, line 169
def noDrift(timestamp, lastStoredDate, timeSinceLastDelta)
  calcMaxOffset = @threshold * timeSinceLastDelta
  drift = lastStoredDate + calcMaxOffset - timestamp.to_i

  return (drift <= (@threshold / 2))
end
receive(event) click to toggle source
# File lib/logstash/outputs/chronix.rb, line 60
def receive(event)
  buffer_receive(event)
end
register() click to toggle source
# File lib/logstash/outputs/chronix.rb, line 42
def register
  # initialize the buffer
  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )

  connectToChronix
end
zipAndEncode(points) click to toggle source

this method zips and base64 encodes the list of points

# File lib/logstash/outputs/chronix.rb, line 132
def zipAndEncode(points)
  # encode protobuf-list
  proto_bytes = points.encode
  string_io = StringIO.new("w")

  # compress the encoded protobuf-list
  gz = Zlib::GzipWriter.new(string_io)
  gz.write(proto_bytes)
  gz.close
  data = string_io.string

  # encode base64 (without \n)
  return Base64.strict_encode64(data)
end