class LogStash::Inputs::AzureWADTable
Constants
- INITIAL_QUERY_SPLIT_PERIOD_MINUTES
- TICKS_SINCE_EPOCH
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
# File lib/logstash/inputs/azurewadtable.rb, line 35 def initialize(*args) super(*args) if @collection_start_time_utc.nil? @collection_start_time_utc = (Time.now - ( 60 * @data_latency_minutes) - 60).iso8601 @logger.debug("collection_start_time_utc = #{@collection_start_time_utc}") end end
Public Instance Methods
build_latent_query()
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 84 def build_latent_query @logger.debug("from #{@last_timestamp} to #{@until_timestamp}") if @last_timestamp > @until_timestamp @logger.debug("last_timestamp is in the future. Will not run any query!") return nil end query_filter = "(PartitionKey gt '#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{partitionkey_from_datetime(@until_timestamp)}')" for i in 0..99 query_filter << " or (PartitionKey gt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@until_timestamp)}')" end # for block query_filter = query_filter.gsub('"','') return AzureQuery.new(@logger, @azure_table_service, @table_name, query_filter, @last_timestamp.to_s + "-" + @until_timestamp.to_s, @entity_count_to_process) end
on_new_data(entity, output_queue, last_good_timestamp)
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 140 def on_new_data(entity, output_queue, last_good_timestamp) #@logger.debug("new event") event = LogStash::Event.new(entity.properties) event.set("type", @table_name) # Help pretty print etw files if (@etw_pretty_print && !event.get("EventMessage").nil? && !event.get("Message").nil?) @logger.debug("event: " + event.to_s) eventMessage = event.get("EventMessage").to_s message = event.get("Message").to_s @logger.debug("EventMessage: " + eventMessage) @logger.debug("Message: " + message) if (eventMessage.include? "%") @logger.debug("starting pretty print") toReplace = eventMessage.scan(/%\d+/) payload = message.scan(/(?<!\\S)([a-zA-Z]+)=(\"[^\"]*\")(?!\\S)/) # Split up the format string to seperate all of the numbers toReplace.each do |key| @logger.debug("Replacing key: " + key.to_s) index = key.scan(/\d+/).join.to_i newValue = payload[index - 1][1] @logger.debug("New Value: " + newValue) eventMessage[key] = newValue end # do block event.set("EventMessage", eventMessage) @logger.debug("pretty print end. result: " + event.get("EventMessage").to_s) end end decorate(event) if event.get('PreciseTimeStamp').is_a?(Time) event.set('PreciseTimeStamp', LogStash::Timestamp.new(event.get('PreciseTimeStamp'))) end theTIMESTAMP = event.get('TIMESTAMP') if theTIMESTAMP.is_a?(LogStash::Timestamp) last_good_timestamp = theTIMESTAMP.to_iso8601 elsif theTIMESTAMP.is_a?(Time) last_good_timestamp = theTIMESTAMP.iso8601 event.set('TIMESTAMP', LogStash::Timestamp.new(theTIMESTAMP)) else @logger.warn("Found result with invalid TIMESTAMP. " + event.to_hash.to_s) end output_queue << event return last_good_timestamp end
partitionkey_from_datetime(time_string)
click to toggle source
Windows Azure Diagnostic's algorithm for determining the partition key based on time is as follows:
-
Take time in UTC without seconds.
-
Convert it into .net ticks
-
add a '0' prefix.
# File lib/logstash/inputs/azurewadtable.rb, line 189 def partitionkey_from_datetime(time_string) collection_time = Time.parse(time_string) if collection_time #@logger.debug("collection time parsed successfully #{collection_time}") else raise(ArgumentError, "Could not parse the time_string") end # if else block collection_time -= collection_time.sec ticks = to_ticks(collection_time) "0#{ticks}" end
process(output_queue)
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 98 def process(output_queue) @until_timestamp = (Time.now - (60 * @data_latency_minutes)).iso8601 last_good_timestamp = nil # split first query so we don't fetch old data several times for no reason if @first_run @first_run = false diff = DateTime.iso8601(@until_timestamp).to_time - DateTime.iso8601(@last_timestamp).to_time if diff > INITIAL_QUERY_SPLIT_PERIOD_MINUTES * 60 @logger.debug("Splitting initial query in two") original_until = @until_timestamp @until_timestamp = (DateTime.iso8601(@until_timestamp).to_time - INITIAL_QUERY_SPLIT_PERIOD_MINUTES * 60).iso8601 query = build_latent_query @duplicate_detector.filter_duplicates(query, ->(entity) { on_new_data(entity, output_queue, last_good_timestamp) }, false) @last_timestamp = (DateTime.iso8601(@until_timestamp).to_time - 1).iso8601 @until_timestamp = original_until end end query = build_latent_query filter_result = @duplicate_detector.filter_duplicates(query, ->(entity) { last_good_timestamp = on_new_data(entity, output_queue, last_good_timestamp) }) if filter_result if (!last_good_timestamp.nil?) @last_timestamp = last_good_timestamp end else @logger.debug("No new results found.") end rescue => e @logger.error("Oh My, An error occurred. Error:#{e}: Trace: #{e.backtrace}", :exception => e) raise end
register()
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 44 def register user_agent = "logstash-input-azurewadtable" user_agent << "/" << Gem.latest_spec_for("logstash-input-azurewadtable").version.to_s if @sas_token.nil? @client = Azure::Storage::Client.create( :storage_account_name => @account_name, :storage_access_key => @access_key, :storage_table_host => "https://#{@account_name}.table.#{@endpoint}", :user_agent_prefix => user_agent) else @client = Azure::Storage::Client.create( :storage_account_name => @account_name, :storage_sas_token => @sas_token, :storage_table_host => "https://#{@account_name}.table.#{@endpoint}", :user_agent_prefix => user_agent) end @azure_table_service = @client.table_client @last_timestamp = @collection_start_time_utc @idle_delay = @idle_delay_seconds @duplicate_detector = DuplicateDetector.new(@logger, @past_queries_count) @first_run = true end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 71 def run(output_queue) while !stop? @logger.debug("Starting process method @" + Time.now.to_s); process(output_queue) @logger.debug("Starting delay of: " + @idle_delay.to_s + " seconds @" + Time.now.to_s); sleep @idle_delay end # while end
teardown()
click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 81 def teardown end
to_ticks(time_to_convert)
click to toggle source
Convert time to ticks
# File lib/logstash/inputs/azurewadtable.rb, line 203 def to_ticks(time_to_convert) #@logger.debug("Converting time to ticks") time_to_convert.to_i * 10000000 - TICKS_SINCE_EPOCH end