class LogStash::Outputs::Application_insights::Storage_recovery
Public Class Methods
instance()
click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 118 def self.instance @@instance end
Private Class Methods
new()
click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 26 def initialize configuration = Config.current @logger = configuration[:logger] @storage_account_name_key = configuration[:storage_account_name_key] @queues = { :commit => {}, :notify => {}, :state_table_update => {} } init_queues( @storage_account_name_key, @queues ) @closing = nil @threads = [] end
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 60 def close @closing = true # @threads.each do |thread| # thread.join # end end
recover_later( tuple, action , storage_account_name )
click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 47 def recover_later ( tuple, action , storage_account_name ) if stopped? if :commit == action @state ||= State.instance @state.dec_pending_commits @shutdown ||= Shutdown.instance @shutdown.display_msg("!!! commit won't recover in this session due to shutdown") end else @queues[action][storage_account_name] << tuple end end
start()
click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 38 def start @storage_account_name_key.each do |storage_account_name, storage_account_keys| # a threads, per storage account name @queues.each_key do |action| @threads << recovery_thread( storage_account_name, action ) end end end
Private Instance Methods
init_queues( storage_account_name_key, queues )
click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 73 def init_queues ( storage_account_name_key, queues ) storage_account_name_key.each do |storage_account_name, storage_account_keys| queues.each_key do |action| queues[action][storage_account_name] = Queue.new end end end
recovery_thread( storage_account_name, action )
click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 81 def recovery_thread( storage_account_name, action ) # a threads, per storage account name, that retries actions on storage Thread.new( storage_account_name, action ) do |storage_account_name, action| counter = Concurrent::AtomicFixnum.new(0) queue = @queues[action][storage_account_name] loop do tuple = queue.pop Stud.stoppable_sleep(Float::INFINITY, 1) { ( state_on?( storage_account_name ) || stopped? ) && 10 > counter.value } if stopped? && !state_on?( storage_account_name ) recover_later( tuple, action, storage_account_name ) else counter.increment Thread.new( action, counter, tuple ) do |action, counter, tuple| if :notify == action Notification.new( tuple ).notify elsif :commit == action Upload_pipe.new( nil, nil, tuple ).commit elsif :state_table_update == action Blob.new( tuple ).state_table_update end counter.decrement end end tuple = nil # release for GC end end end
state_on?( storage_account_name )
click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 110 def state_on? ( storage_account_name ) Clients.instance.storage_account_state_on?( storage_account_name ) end
stopped?()
click to toggle source
# File lib/logstash/outputs/application_insights/storage_recovery.rb, line 69 def stopped? @closing end