class LogStash::Outputs::Chronix
Public Instance Methods
almostEquals(delta, prevDelta)
click to toggle source
checks if two offsets are almost equals
# File lib/logstash/outputs/chronix.rb, line 162 def almostEquals(delta, prevDelta) diff = (delta - prevDelta).abs return (diff <= @threshold) end
connectToChronix()
click to toggle source
open the connection to chronix
# File lib/logstash/outputs/chronix.rb, line 54 def connectToChronix @url = "http://" + @host + ":" + @port + @path @solr = RSolr.connect :url => @url end
createChronixPoint(delta, value, type = "")
click to toggle source
# File lib/logstash/outputs/chronix.rb, line 147 def createChronixPoint(delta, value, type = "") if type == "strace" return Chronix::StracePoint.new( :t => delta, :v => value ) else return Chronix::Point.new( :t => delta, :v => value ) end end
createPointHash(events)
click to toggle source
this method iterates through all events and creates a hash with different lists of points sorted by metric
# File lib/logstash/outputs/chronix.rb, line 81 def createPointHash(events) pointHash = Hash.new # add each event to our hash, sorted by metrics as key events.each do |event| eventData = event.to_hash() # format the timestamp to unix format timestamp = DateTime.iso8601("#{eventData["@timestamp"]}").to_time.to_i metric = eventData["metric"] # if there is no list for the current metric -> create a new one if pointHash[metric] == nil if eventData["chronix_type"] == "strace" pointHash[metric] = {"startTime" => timestamp, "lastTimestamp" => 0, "points" => Chronix::StracePoints.new, "prevDelta" => 0, "timeSinceLastDelta" => 0, "lastStoredDate" => timestamp} else pointHash[metric] = {"startTime" => timestamp, "lastTimestamp" => 0, "points" => Chronix::Points.new, "prevDelta" => 0, "timeSinceLastDelta" => 0, "lastStoredDate" => timestamp} end end if pointHash[metric]["lastTimestamp"] == 0 delta = 0 else delta = timestamp - pointHash[metric]["lastTimestamp"] end if (almostEquals(delta, pointHash[metric]["prevDelta"]) && noDrift(timestamp, pointHash[metric]["lastStoredDate"], pointHash[metric]["timeSinceLastDelta"])) # insert the current point in our list pointHash[metric]["points"].p << createChronixPoint(0, eventData["value"], eventData["chronix_type"]) pointHash[metric]["timeSinceLastDelta"] += 1 else # insert the current point in our list pointHash[metric]["points"].p << createChronixPoint(delta, eventData["value"], eventData["chronix_type"]) pointHash[metric]["timeSinceLastDelta"] = 1 pointHash[metric]["lastStoredDate"] = timestamp end # save current timestamp as lastTimestamp and the previousOffset pointHash[metric]["lastTimestamp"] = timestamp pointHash[metric]["prevDelta"] = delta end #end do return pointHash end
createSolrDocument(metric, phash)
click to toggle source
# File lib/logstash/outputs/chronix.rb, line 155 def createSolrDocument(metric, phash) endTime = phash["lastTimestamp"] # maybe use startTime + delta here?! # TODO add more meta-data return { :metric => metric, :start => phash["startTime"], :end => endTime, :data => zipAndEncode(phash["points"]) } end
flush(events, close=false)
click to toggle source
# File lib/logstash/outputs/chronix.rb, line 65 def flush(events, close=false) pointHash = createPointHash(events) documents = [] # iterate through pointHash and create a solr document pointHash.each { |metric, phash| documents << createSolrDocument(metric, phash) } # send to chronix @solr.add documents @solr.update :data => '<commit/>' end
noDrift(timestamp, lastStoredDate, timeSinceLastDelta)
click to toggle source
checks if there is a drift
# File lib/logstash/outputs/chronix.rb, line 169 def noDrift(timestamp, lastStoredDate, timeSinceLastDelta) calcMaxOffset = @threshold * timeSinceLastDelta drift = lastStoredDate + calcMaxOffset - timestamp.to_i return (drift <= (@threshold / 2)) end
receive(event)
click to toggle source
# File lib/logstash/outputs/chronix.rb, line 60 def receive(event) buffer_receive(event) end
register()
click to toggle source
# File lib/logstash/outputs/chronix.rb, line 42 def register # initialize the buffer buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) connectToChronix end
zipAndEncode(points)
click to toggle source
this method zips and base64 encodes the list of points
# File lib/logstash/outputs/chronix.rb, line 132 def zipAndEncode(points) # encode protobuf-list proto_bytes = points.encode string_io = StringIO.new("w") # compress the encoded protobuf-list gz = Zlib::GzipWriter.new(string_io) gz.write(proto_bytes) gz.close data = string_io.string # encode base64 (without \n) return Base64.strict_encode64(data) end