class LogStash::Outputs::Application_insights::Storage_recovery

Public Class Methods

instance() click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 118
def self.instance
  @@instance
end

Private Class Methods

new() click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 26
def initialize
  configuration = Config.current
  @logger = configuration[:logger]
  @storage_account_name_key = configuration[:storage_account_name_key]

  @queues = { :commit => {}, :notify => {}, :state_table_update => {} }
  init_queues( @storage_account_name_key, @queues )

  @closing = nil
  @threads = []
end

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 60
def close
  @closing = true
  # @threads.each do |thread|
  #   thread.join
  # end
end
recover_later( tuple, action , storage_account_name ) click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 47
def recover_later ( tuple, action , storage_account_name )
  if stopped?
    if :commit == action
      @state ||= State.instance
      @state.dec_pending_commits
      @shutdown ||= Shutdown.instance
      @shutdown.display_msg("!!! commit won't recover in this session due to shutdown")
    end
  else
    @queues[action][storage_account_name] << tuple
  end
end
start() click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 38
def start
  @storage_account_name_key.each do |storage_account_name, storage_account_keys|
    # a threads, per storage  account name
    @queues.each_key do |action|
      @threads << recovery_thread( storage_account_name, action )
    end
  end
end

Private Instance Methods

init_queues( storage_account_name_key, queues ) click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 73
def init_queues ( storage_account_name_key, queues )
  storage_account_name_key.each do |storage_account_name, storage_account_keys|
    queues.each_key do |action|
      queues[action][storage_account_name] = Queue.new
    end
  end
end
recovery_thread( storage_account_name, action ) click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 81
def recovery_thread( storage_account_name, action )
  # a threads, per storage  account name, that retries actions on storage
  Thread.new( storage_account_name, action ) do |storage_account_name, action|
    counter = Concurrent::AtomicFixnum.new(0)
    queue = @queues[action][storage_account_name]
    loop do
      tuple = queue.pop
      Stud.stoppable_sleep(Float::INFINITY, 1) { ( state_on?( storage_account_name )  || stopped? ) && 10 > counter.value }

      if stopped? && !state_on?( storage_account_name )
        recover_later( tuple, action, storage_account_name )
      else
        counter.increment
        Thread.new( action, counter, tuple ) do |action, counter, tuple|
          if :notify == action
            Notification.new( tuple ).notify
          elsif :commit == action
            Upload_pipe.new( nil, nil, tuple ).commit
          elsif :state_table_update == action
            Blob.new( tuple ).state_table_update
          end
          counter.decrement
        end
      end
      tuple = nil # release for GC
    end
  end
end
state_on?( storage_account_name ) click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 110
def state_on? ( storage_account_name )
  Clients.instance.storage_account_state_on?( storage_account_name )
end
stopped?() click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 69
def stopped?
  @closing
end