class LogStash::Inputs::AzureWADTable

Constants

INITIAL_QUERY_SPLIT_PERIOD_MINUTES
TICKS_SINCE_EPOCH

Public Class Methods

new(*args) click to toggle source
Calls superclass method
# File lib/logstash/inputs/azurewadtable.rb, line 35
def initialize(*args)
  super(*args)
  if @collection_start_time_utc.nil?
    @collection_start_time_utc = (Time.now - ( 60 * @data_latency_minutes) - 60).iso8601
    @logger.debug("collection_start_time_utc = #{@collection_start_time_utc}")
  end
end

Public Instance Methods

build_latent_query() click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 84
def build_latent_query
  @logger.debug("from #{@last_timestamp} to #{@until_timestamp}")
  if @last_timestamp > @until_timestamp
    @logger.debug("last_timestamp is in the future. Will not run any query!")
    return nil
  end
  query_filter = "(PartitionKey gt '#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{partitionkey_from_datetime(@until_timestamp)}')"
  for i in 0..99
    query_filter << " or (PartitionKey gt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@until_timestamp)}')"
  end # for block
  query_filter = query_filter.gsub('"','')
  return AzureQuery.new(@logger, @azure_table_service, @table_name, query_filter, @last_timestamp.to_s + "-" + @until_timestamp.to_s, @entity_count_to_process)
end
on_new_data(entity, output_queue, last_good_timestamp) click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 140
def on_new_data(entity, output_queue, last_good_timestamp)
  #@logger.debug("new event")
  event = LogStash::Event.new(entity.properties)
  event.set("type", @table_name)

  # Help pretty print etw files
  if (@etw_pretty_print && !event.get("EventMessage").nil? && !event.get("Message").nil?)
    @logger.debug("event: " + event.to_s)
    eventMessage = event.get("EventMessage").to_s
    message = event.get("Message").to_s
    @logger.debug("EventMessage: " + eventMessage)
    @logger.debug("Message: " + message)
    if (eventMessage.include? "%")
      @logger.debug("starting pretty print")
      toReplace = eventMessage.scan(/%\d+/)
      payload = message.scan(/(?<!\\S)([a-zA-Z]+)=(\"[^\"]*\")(?!\\S)/)
      # Split up the format string to seperate all of the numbers
      toReplace.each do |key|
        @logger.debug("Replacing key: " + key.to_s)
        index = key.scan(/\d+/).join.to_i
        newValue = payload[index - 1][1]
        @logger.debug("New Value: " + newValue)
        eventMessage[key] = newValue
      end # do block
      event.set("EventMessage", eventMessage)
      @logger.debug("pretty print end. result: " + event.get("EventMessage").to_s)
    end
  end
  decorate(event)
  if event.get('PreciseTimeStamp').is_a?(Time)
    event.set('PreciseTimeStamp', LogStash::Timestamp.new(event.get('PreciseTimeStamp')))
  end
  theTIMESTAMP = event.get('TIMESTAMP')
  if theTIMESTAMP.is_a?(LogStash::Timestamp)
    last_good_timestamp = theTIMESTAMP.to_iso8601
  elsif theTIMESTAMP.is_a?(Time)
    last_good_timestamp = theTIMESTAMP.iso8601
    event.set('TIMESTAMP', LogStash::Timestamp.new(theTIMESTAMP))
  else
    @logger.warn("Found result with invalid TIMESTAMP. " + event.to_hash.to_s)
  end
  output_queue << event
  return last_good_timestamp
end
partitionkey_from_datetime(time_string) click to toggle source

Windows Azure Diagnostic's algorithm for determining the partition key based on time is as follows:

  1. Take time in UTC without seconds.

  2. Convert it into .net ticks

  3. add a '0' prefix.

# File lib/logstash/inputs/azurewadtable.rb, line 189
def partitionkey_from_datetime(time_string)
  collection_time = Time.parse(time_string)
  if collection_time
    #@logger.debug("collection time parsed successfully #{collection_time}")
  else
    raise(ArgumentError, "Could not parse the time_string")
  end # if else block

  collection_time -= collection_time.sec
  ticks = to_ticks(collection_time)
  "0#{ticks}"
end
process(output_queue) click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 98
def process(output_queue)
  @until_timestamp = (Time.now - (60 * @data_latency_minutes)).iso8601
  last_good_timestamp = nil

   # split first query so we don't fetch old data several times for no reason
  if @first_run
    @first_run = false
    diff = DateTime.iso8601(@until_timestamp).to_time - DateTime.iso8601(@last_timestamp).to_time
    if diff > INITIAL_QUERY_SPLIT_PERIOD_MINUTES * 60
      @logger.debug("Splitting initial query in two")
      original_until = @until_timestamp

      @until_timestamp = (DateTime.iso8601(@until_timestamp).to_time - INITIAL_QUERY_SPLIT_PERIOD_MINUTES * 60).iso8601

      query = build_latent_query
      @duplicate_detector.filter_duplicates(query, ->(entity) {
        on_new_data(entity, output_queue, last_good_timestamp)
      }, false)

      @last_timestamp = (DateTime.iso8601(@until_timestamp).to_time - 1).iso8601
      @until_timestamp = original_until
    end
  end

  query = build_latent_query
  filter_result = @duplicate_detector.filter_duplicates(query, ->(entity) {
    last_good_timestamp = on_new_data(entity, output_queue, last_good_timestamp)
  })

  if filter_result
    if (!last_good_timestamp.nil?)
      @last_timestamp = last_good_timestamp
    end
  else
    @logger.debug("No new results found.")
  end

rescue => e
  @logger.error("Oh My, An error occurred. Error:#{e}: Trace: #{e.backtrace}", :exception => e)
  raise
end
register() click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 44
def register
  user_agent = "logstash-input-azurewadtable"
  user_agent << "/" << Gem.latest_spec_for("logstash-input-azurewadtable").version.to_s

  if @sas_token.nil?
      @client = Azure::Storage::Client.create(
      :storage_account_name => @account_name,
      :storage_access_key => @access_key,
      :storage_table_host => "https://#{@account_name}.table.#{@endpoint}",
      :user_agent_prefix => user_agent)
  else
    @client = Azure::Storage::Client.create(
      :storage_account_name => @account_name,
      :storage_sas_token => @sas_token,
      :storage_table_host => "https://#{@account_name}.table.#{@endpoint}",
      :user_agent_prefix => user_agent)
  end

  @azure_table_service = @client.table_client

  @last_timestamp = @collection_start_time_utc
  @idle_delay = @idle_delay_seconds
  @duplicate_detector = DuplicateDetector.new(@logger, @past_queries_count)
  @first_run = true
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 71
def run(output_queue)
  while !stop?
    @logger.debug("Starting process method @" + Time.now.to_s);
    process(output_queue)
    @logger.debug("Starting delay of: " + @idle_delay.to_s + " seconds @" + Time.now.to_s);
    sleep @idle_delay
  end # while
end
teardown() click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 81
def teardown
end
to_ticks(time_to_convert) click to toggle source

Convert time to ticks

# File lib/logstash/inputs/azurewadtable.rb, line 203
def to_ticks(time_to_convert)
  #@logger.debug("Converting time to ticks")
  time_to_convert.to_i * 10000000 - TICKS_SINCE_EPOCH
end