class LogStash::Inputs::Azurenlogtable

Generate a repeating message.

This plugin is intented only as an example.

Constants

TICKS_SINCE_EPOCH

Public Class Methods

new(*args) click to toggle source
Calls superclass method
# File lib/logstash/inputs/azurenlogtable.rb, line 32
def initialize(*args)
  super(*args)
  if @collection_start_time_utc.nil?
    @collection_start_time_utc = (Time.now - ( 60 * @data_latency_minutes) - 60).iso8601
    @logger.debug("collection_start_time_utc = #{@collection_start_time_utc}")
  end
end

Public Instance Methods

build_latent_query() click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 100
def build_latent_query
  @logger.debug("from #{@last_timestamp} to #{@until_timestamp}")
  if @last_timestamp > @until_timestamp
    @logger.debug("last_timestamp is in the future. Will not run any query!")
    return nil
  end
  query_filter = "(PartitionKey gt '#{@last_timestamp}' and PartitionKey lt '#{@until_timestamp}')"
  query_filter = query_filter.gsub('"','')
  return AzureQuery.new(@logger, @azure_table_service, @table_name, query_filter, @last_timestamp.to_s + "-" + @until_timestamp.to_s, @entity_count_to_process)
end
on_new_data(entity, output_queue, last_good_timestamp) click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 111
def on_new_data(entity, output_queue, last_good_timestamp)
  #@logger.debug("new event")
  
  event = LogStash::Event.new(entity.properties)
  event.set("type", @table_name)
  @logger.debug("new event:" + event.to_hash.to_s)

  decorate(event)
  last_good_timestamp = event.get('PartitionKey')
  output_queue << event
  return last_good_timestamp
end
partitionkey_from_datetime(time_string) click to toggle source

Windows Azure Diagnostic's algorithm for determining the partition key based on time is as follows:

  1. Take time in UTC without seconds.

  2. Convert it into .net ticks

  3. add a '0' prefix.

# File lib/logstash/inputs/azurenlogtable.rb, line 128
def partitionkey_from_datetime(time_string)
  if time_string.nil?
    @logger.warn("partitionkey_from_datetime with invalid time_string. ")
    collection_time = (Time.now - ( 60 * @data_latency_minutes) - 60)
  else
    begin
      collection_time = Time.parse(time_string)
    rescue => e  
      @logger.error("partitionkey_from_datetime fail with time_string =>" + time_string, :exception => e)
      collection_time = (Time.now - ( 60 * @data_latency_minutes) - 60)
    end
  end
  if collection_time
    #@logger.debug("collection time parsed successfully #{collection_time}")
  else
    raise(ArgumentError, "Could not parse the time_string => #{time_string}")
  end # if else block

  collection_time -= collection_time.sec
  ticks = to_ticks(collection_time)
  "0#{ticks}"
end
process(output_queue) click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 77
def process(output_queue)
  @until_timestamp = partitionkey_from_datetime(Time.now.iso8601)
  last_good_timestamp = nil

  log_count = 0

  query = build_latent_query
  query.reset
  query.run( ->(entity) {
    last_good_timestamp = on_new_data(entity, output_queue, last_good_timestamp)
    log_count += 1
  })
  
  @logger.debug("log total count => #{log_count}")
  if (!last_good_timestamp.nil?)
    @last_timestamp = last_good_timestamp
  end

rescue => e
  @logger.error("Oh My, An error occurred. Error:#{e}: Trace: #{e.backtrace}", :exception => e)
  raise
end
register() click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 41
def register
  user_agent = "logstash-input-azurenlogtable-0.1.0"
  
  if @sas_token.nil?
      @client = Azure::Storage::Client.create(
      :storage_account_name => @account_name,
      :storage_access_key => @access_key,
      :storage_table_host => "https://#{@account_name}.table.#{@endpoint}",
      :user_agent_prefix => user_agent)
  else
    @client = Azure::Storage::Client.create(
      :storage_account_name => @account_name,
      :storage_sas_token => @sas_token,
      :storage_table_host => "https://#{@account_name}.table.#{@endpoint}",
      :user_agent_prefix => user_agent)
  end

  @azure_table_service = @client.table_client
  @last_timestamp = partitionkey_from_datetime(@collection_start_time_utc)
  @idle_delay = @idle_delay_seconds
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 64
def run(output_queue)
  while !stop?
    @logger.debug("Starting process method @" + Time.now.to_s);
    begin
      process(output_queue)
    rescue => e
      @logger.error("process fail", :exception => e)
    end
    @logger.debug("Starting delay of: " + @idle_delay.to_s + " seconds @" + Time.now.to_s);
    sleep @idle_delay
  end # while
end
stop() click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 157
def stop
  # nothing to do in this case so it is not necessary to define stop
  # examples of common "stop" tasks:
  #  * close sockets (unblocking blocking reads/accepts)
  #  * cleanup temporary files
  #  * terminate spawned threads
end
to_ticks(time_to_convert) click to toggle source

Convert time to ticks

# File lib/logstash/inputs/azurenlogtable.rb, line 152
def to_ticks(time_to_convert)
  #@logger.debug("Converting time to ticks")
  time_to_convert.to_i * 10000000 - TICKS_SINCE_EPOCH
end