class LogStash::Inputs::Azurenlogtable
Generate a repeating message.
This plugin is intented only as an example.
Constants
- TICKS_SINCE_EPOCH
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
# File lib/logstash/inputs/azurenlogtable.rb, line 32 def initialize(*args) super(*args) if @collection_start_time_utc.nil? @collection_start_time_utc = (Time.now - ( 60 * @data_latency_minutes) - 60).iso8601 @logger.debug("collection_start_time_utc = #{@collection_start_time_utc}") end end
Public Instance Methods
build_latent_query()
click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 100 def build_latent_query @logger.debug("from #{@last_timestamp} to #{@until_timestamp}") if @last_timestamp > @until_timestamp @logger.debug("last_timestamp is in the future. Will not run any query!") return nil end query_filter = "(PartitionKey gt '#{@last_timestamp}' and PartitionKey lt '#{@until_timestamp}')" query_filter = query_filter.gsub('"','') return AzureQuery.new(@logger, @azure_table_service, @table_name, query_filter, @last_timestamp.to_s + "-" + @until_timestamp.to_s, @entity_count_to_process) end
on_new_data(entity, output_queue, last_good_timestamp)
click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 111 def on_new_data(entity, output_queue, last_good_timestamp) #@logger.debug("new event") event = LogStash::Event.new(entity.properties) event.set("type", @table_name) @logger.debug("new event:" + event.to_hash.to_s) decorate(event) last_good_timestamp = event.get('PartitionKey') output_queue << event return last_good_timestamp end
partitionkey_from_datetime(time_string)
click to toggle source
Windows Azure Diagnostic's algorithm for determining the partition key based on time is as follows:
-
Take time in UTC without seconds.
-
Convert it into .net ticks
-
add a '0' prefix.
# File lib/logstash/inputs/azurenlogtable.rb, line 128 def partitionkey_from_datetime(time_string) if time_string.nil? @logger.warn("partitionkey_from_datetime with invalid time_string. ") collection_time = (Time.now - ( 60 * @data_latency_minutes) - 60) else begin collection_time = Time.parse(time_string) rescue => e @logger.error("partitionkey_from_datetime fail with time_string =>" + time_string, :exception => e) collection_time = (Time.now - ( 60 * @data_latency_minutes) - 60) end end if collection_time #@logger.debug("collection time parsed successfully #{collection_time}") else raise(ArgumentError, "Could not parse the time_string => #{time_string}") end # if else block collection_time -= collection_time.sec ticks = to_ticks(collection_time) "0#{ticks}" end
process(output_queue)
click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 77 def process(output_queue) @until_timestamp = partitionkey_from_datetime(Time.now.iso8601) last_good_timestamp = nil log_count = 0 query = build_latent_query query.reset query.run( ->(entity) { last_good_timestamp = on_new_data(entity, output_queue, last_good_timestamp) log_count += 1 }) @logger.debug("log total count => #{log_count}") if (!last_good_timestamp.nil?) @last_timestamp = last_good_timestamp end rescue => e @logger.error("Oh My, An error occurred. Error:#{e}: Trace: #{e.backtrace}", :exception => e) raise end
register()
click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 41 def register user_agent = "logstash-input-azurenlogtable-0.1.0" if @sas_token.nil? @client = Azure::Storage::Client.create( :storage_account_name => @account_name, :storage_access_key => @access_key, :storage_table_host => "https://#{@account_name}.table.#{@endpoint}", :user_agent_prefix => user_agent) else @client = Azure::Storage::Client.create( :storage_account_name => @account_name, :storage_sas_token => @sas_token, :storage_table_host => "https://#{@account_name}.table.#{@endpoint}", :user_agent_prefix => user_agent) end @azure_table_service = @client.table_client @last_timestamp = partitionkey_from_datetime(@collection_start_time_utc) @idle_delay = @idle_delay_seconds end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 64 def run(output_queue) while !stop? @logger.debug("Starting process method @" + Time.now.to_s); begin process(output_queue) rescue => e @logger.error("process fail", :exception => e) end @logger.debug("Starting delay of: " + @idle_delay.to_s + " seconds @" + Time.now.to_s); sleep @idle_delay end # while end
stop()
click to toggle source
# File lib/logstash/inputs/azurenlogtable.rb, line 157 def stop # nothing to do in this case so it is not necessary to define stop # examples of common "stop" tasks: # * close sockets (unblocking blocking reads/accepts) # * cleanup temporary files # * terminate spawned threads end
to_ticks(time_to_convert)
click to toggle source
Convert time to ticks
# File lib/logstash/inputs/azurenlogtable.rb, line 152 def to_ticks(time_to_convert) #@logger.debug("Converting time to ticks") time_to_convert.to_i * 10000000 - TICKS_SINCE_EPOCH end