class LogStash::Outputs::Application_insights::Shutdown_recovery

Public Class Methods

instance() click to toggle source
# File lib/logstash/outputs/application_insights/shutdown_recovery.rb, line 134
def self.instance
  @@instance
end

Private Class Methods

new() click to toggle source
# File lib/logstash/outputs/application_insights/shutdown_recovery.rb, line 26
def initialize
  configuration = Config.current
  @logger = configuration[:logger]
  @storage_account_name_key = configuration[:storage_account_name_key]
  @partition_key_prefix =configuration[:partition_key_prefix]

  @closing = nil
  @threads = []
end

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/application_insights/shutdown_recovery.rb, line 45
def close
  @closing = true
  @threads.each do |thread|
    thread.join
  end
end
start() click to toggle source
# File lib/logstash/outputs/application_insights/shutdown_recovery.rb, line 36
def start
  @storage_recovery = Storage_recovery.instance
  @notification_recovery = Notification_recovery.instance
  @storage_account_name_key.each do |storage_account_name, storage_account_keys|
    @threads << recovery_thread( storage_account_name, :uploading)
    @threads << recovery_thread( storage_account_name, :committed)
  end
end

Private Instance Methods

recovery_thread( storage_account_name, state ) click to toggle source
# File lib/logstash/outputs/application_insights/shutdown_recovery.rb, line 58
def recovery_thread( storage_account_name, state )
  Thread.new( storage_account_name, state ) do |storage_account_name, state|
    
    blob = Blob.new

    committed_tuples = [  ]
    uncommitted_tuples = [  ]
    upload_empty_tuples = [  ]
    token = nil
    finished = false
    filter = "#{:PartitionKey} eq '#{@partition_key_prefix}-#{state}'"

    # should exit thread after fetching data from table, and submit recovery, the loop is only for case of failure
    until finished || stopped? do
      entities = blob.state_table_query( storage_account_name, filter, token )
      if entities
        token = entities.continuation_token

        if :committed == state
          entities.each do |entity|
            State.instance.inc_pending_notifications
            tuple = blob.table_entity_to_tuple( entity.properties )
            @notification_recovery.enqueue( tuple )
          end

        elsif :uploading == state
          # first tuples are collected, before send to queues, to make sure blob states don't change in between
          entities.each do |entity|
            typed_tuple = nil
            until typed_tuple || stopped?
              blob.table_entity_to_context( entity.properties )
              typed_tuple = blob.update_commited_or_uncommited_list
              Stud.stoppable_sleep(60, 1) { stopped? } unless typed_tuple
            end

            next if stopped?

            if typed_tuple[:committed]
              committed_tuples << typed_tuple[:committed]
            elsif typed_tuple[:uncommitted]
              uncommitted_tuples << typed_tuple[:uncommitted]
            else
              upload_empty_tuples << typed_tuple[:upload_empty]
            end
          end
        end

        next if token

        committed_tuples.each do |tuple|
          State.instance.inc_pending_commits
          @storage_recovery.recover_later( tuple, :state_table_update, storage_account_name )
        end

        uncommitted_tuples.each do |tuple|
          State.instance.inc_pending_commits
          @storage_recovery.recover_later( tuple, :commit, storage_account_name )
        end

        upload_empty_tuples.each do |tuple|
          @storage_recovery.recover_later( tuple, :state_table_update, storage_account_name )
        end

        finished = true
      else
        Stud.stoppable_sleep(60, 1) { stopped? }
      end
    end
    @logger.info { "exit table recovery thread, storage: #{storage_account_name}, state: #{state}, entities: #{entities ? entities.length : nil}" }
  end
end
stopped?() click to toggle source
# File lib/logstash/outputs/application_insights/shutdown_recovery.rb, line 54
def stopped?
  @closing
end