class LogStash::Outputs::Application_insights::Blob
Constants
- CREATE_EXIST_ERRORS
Attributes
last_io_exception[R]
Public Class Methods
close()
click to toggle source
# File lib/logstash/outputs/application_insights/blob.rb, line 30 def self.close @@closing = true end
new( tuple = nil )
click to toggle source
Calls superclass method
# File lib/logstash/outputs/application_insights/blob.rb, line 38 def initialize ( tuple = nil ) @configuration = Config.current @logger = @configuration[:logger] @storage_recovery = Storage_recovery.instance @notification_recovery = Notification_recovery.instance @max_tries = @configuration[:io_max_retries] + 1 super( tuple ) end
stopped?()
click to toggle source
# File lib/logstash/outputs/application_insights/blob.rb, line 34 def self.stopped? @@closing end
Public Instance Methods
create_container_exist_recovery()
click to toggle source
# File lib/logstash/outputs/application_insights/blob.rb, line 68 def create_container_exist_recovery create_exist_recovery( :container, @container_name ) { |name| @client.blobClient.create_container( name ) } end
create_exist_recovery( type, name ) { |name| ... }
click to toggle source
# File lib/logstash/outputs/application_insights/blob.rb, line 49 def create_exist_recovery( type, name ) if CREATE_EXIST_ERRORS[type][0] == @recovery @prev_info = @info @info = "create #{type} #{@storage_account_name}/#{name}" # assume that exceptions can be raised due to this method: yield name @logger.info { "Successed to #{@info}" } @info = @prev_info elsif CREATE_EXIST_ERRORS[type][1] == @recovery @logger.info { "Successed (already exist) to #{@info}" } @info = @prev_info end end
create_table_exist_recovery()
click to toggle source
# File lib/logstash/outputs/application_insights/blob.rb, line 64 def create_table_exist_recovery create_exist_recovery( :table, @configuration[:state_table_name] ) { |name| @client.tableClient.create_table( name ) } end
state_table_delete( state = nil )
click to toggle source
return true on success
# File lib/logstash/outputs/application_insights/blob.rb, line 111 def state_table_delete ( state = nil ) state ||= @log_state @action = :state_table_delete @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :create_resource ] @info = "#{@action} #{state} #{@storage_account_name}/#{@container_name}/#{@blob_name}" success = storage_io_block { create_table_exist_recovery if :create_resource == @recovery @logger.info { "Note: delete entity failed, already deleted, #{@info}, state: #{state}, log_state: #{@log_state}" } else @client.tableClient.delete_entity( @configuration[:state_table_name], "#{@configuration[:partition_key_prefix]}-#{state}", @blob_name.gsub( "/", "_" ) ) end } @storage_recovery.recover_later( context_to_tuple, :state_table_update, @storage_account_name ) unless success success end
state_table_insert()
click to toggle source
return true on success
# File lib/logstash/outputs/application_insights/blob.rb, line 73 def state_table_insert @action = :state_table_insert @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy, :entity_exist ] @info = "#{@action} #{@log_state} #{@storage_account_name}/#{@container_name}/#{@blob_name}" success = storage_io_block { create_table_exist_recovery if :entity_exist == @recovery raise NotRecoverableError if :uploading == @log_state else entity_values = context_to_table_entity entity_values[:PartitionKey] = "#{@configuration[:partition_key_prefix]}-#{@log_state}" entity_values[:RowKey] = @blob_name.gsub("/","_") @client.tableClient.insert_entity( @configuration[:state_table_name], entity_values ) end } @storage_recovery.recover_later( context_to_tuple, :state_table_update, @storage_account_name ) unless success || :uploading == @log_state success end
state_table_query( storage_account_name, filter , token )
click to toggle source
return entities
# File lib/logstash/outputs/application_insights/blob.rb, line 130 def state_table_query ( storage_account_name, filter , token ) @storage_account_name = storage_account_name @action = :state_table_query @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :table_exist, :create_table, :table_busy ] @info = "#{@action} #{@storage_account_name}/#{@configuration[:state_table_name]}" entities = nil success = storage_io_block { create_table_exist_recovery options = { :filter => filter } options[:continuation_token] = token if token entities = @client.tableClient.query_entities( @configuration[:state_table_name], options ) } entities end
state_table_update()
click to toggle source
# File lib/logstash/outputs/application_insights/blob.rb, line 92 def state_table_update if :uploading == @log_state state_table_delete elsif :committed == @log_state if state_table_insert && state_table_delete( :uploading ) State.instance.dec_pending_commits State.instance.inc_pending_notifications # this is not a recovery, it is actually enqueue to notify @notification_recovery.enqueue( context_to_tuple ) end elsif :notified == @log_state if (!@configuration[:save_notified_blobs_records] || state_table_insert) && state_table_delete( :committed ) State.instance.dec_pending_notifications end end end
update_commited_or_uncommited_list()
click to toggle source
# File lib/logstash/outputs/application_insights/blob.rb, line 148 def update_commited_or_uncommited_list @action = :list_blob_blocks @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :container_exist, :create_container, :create_blob ] list_blob_blocks = nil success = storage_io_block { @info = "#{@action} #{@storage_account_name}/#{@container_name}/#{@blob_name}" create_container_exist_recovery if :create_blob == @recovery list_blob_blocks = { :uncommitted => [ ], :committed => [ ] } else list_blob_blocks = @client.blobClient.list_blob_blocks( @container_name, @blob_name, { :blocklist_type => :all } ) unless :create_blob == @recovery end } if list_blob_blocks blocks = ( list_blob_blocks[:uncommitted].empty? ? list_blob_blocks[:committed] : list_blob_blocks[:uncommitted] ) blocks.each do |block| @uploaded_block_ids << [ block.name ] @uploaded_bytesize += block.size end if 0 < @file_size && @uploaded_bytesize != @file_size type = :upload_empty else type = ( blocks.empty? || 0 == @uploaded_bytesize ? :upload_empty : blocks[0].type ) end @log_state = :committed if :committed == type { type => context_to_tuple } else nil end end
Private Instance Methods
error_to_sym( e )
click to toggle source
# File lib/logstash/outputs/application_insights/blob.rb, line 216 def error_to_sym ( e ) if e.is_a?( Azure::Core::Http::HTTPError ) if 404 == e.status_code if "ContainerNotFound" == e.type :create_container elsif "TableNotFound" == e.type :create_table elsif "BlobNotFound" == e.type :create_blob elsif "ResourceNotFound" == e.type :create_resource else :create_resource end elsif 409 == e.status_code if "ContainerAlreadyExists" == e.type :container_exist elsif "BlobAlreadyExists" == e.type :blob_exist elsif "TableAlreadyExists" == e.type :table_exist elsif "TableBeingDeleted" == e.type :table_busy elsif "EntityAlreadyExists" == e.type :entity_exist else :http_unknown end elsif 403 == e.status_code if "AuthenticationFailed" == e.type :invalid_storage_key elsif "Unknown" == e.type && e.description.include?("Blob does not exist or not accessible.") :notify_failed_blob_not_accessible else :access_denied end elsif 400 == e.status_code && "Unknown" == e.type && e.description.include?("Invalid instrumentation key") :invalid_instrumentation_key elsif 500 == e.status_code && "Unknown" == e.type && e.description.include?("Processing error") :notification_process_down elsif 501 == e.status_code :not_implemented elsif 503 == e.status_code :service_unavailable else :http_unknown end # communication error elsif e.is_a?( Faraday::ClientError ) :io_failure # communication error elsif e.is_a?( IOError ) :io_failure # all storage accounts are dead, couldn't get client (internal exception) elsif e.is_a?( StorageAccountsOffError ) :io_all_dead # all storage accounts are dead, couldn't get client (internal exception) elsif e.is_a?( NotRecoverableError ) :not_recoverable elsif e.is_a?( NameError ) && e.message.include?( "uninitialized constant Azure::Core::Auth::Signer::OpenSSL" ) :init_error elsif e.is_a?( NameError ) && e.message.include?( "uninitialized constant Azure::Storage::Auth::SharedAccessSignature" ) :init_error else :unknown end end
recover_retry?( e )
click to toggle source
# File lib/logstash/outputs/application_insights/blob.rb, line 310 def recover_retry? ( e ) recovery = error_to_sym( e ) if :init_error == recovery @client = @client.dispose if @client sleep( 1 ) recovery = nil elsif :http_unknown == recovery || :unknown == recovery puts "\n>>>> UNKNOWN error - #{e.inspect} <<<<\n" raise e if @configuration[:stop_on_unknown_io_errors] end return [nil, recovery] unless recovery && @recoverable.include?( recovery ) case recovery when :container_exist, :table_exist, :entity_exist, :create_container, :create_table # ignore log error # @logger.error { "Failed to #{@info} ;( recovery: continue, error: #{e.inspect}" } when :invalid_storage_key, :notify_failed_blob_not_accessible if @client.switch_storage_account_key! @logger.error { "Failed to #{@info} ;( recovery: switched to secondary storage key, error: #{e.inspect}" } else @client = @client.dispose( :auth_to_storage_failed ) if @client && :invalid_storage_key == recovery return [nil, recovery] end when :table_busy @client = @client.dispose if @client sleep( @configuration[:io_retry_delay] ) @logger.error { "Failed to #{@info} ;( recovery: retry, error: #{e.inspect}" } when :io_failure, :service_unavailable, :notification_process_down if @try_count < @max_tries @client = @client.dispose if @client sleep( @configuration[:io_retry_delay] ) @logger.error { "Failed to #{@info} ;( recovery: retry, try #{@try_count} / #{@max_tries}, error: #{e.inspect}" } @try_count += 1 else if :io_failure == recovery || ( :service_unavailable == recovery && :notify != @action ) @client = @client.dispose( :io_to_storage_failed ) if @client end return [nil, recovery] end when :invalid_instrumentation_key, :invalid_table_id if :notify == @action # only for notify, not for test endpoint if @try_count < @max_tries @client = @client.dispose if @client sleep( @configuration[:io_retry_delay] ) @logger.error { "Failed to #{@info} ;( recovery: retry, try #{@try_count} / #{@max_tries}, error: #{e.inspect}" } @try_count += 1 else if :invalid_instrumentation_key == recovery Channels.instance.mark_invalid_instrumentation_key( @instrumentation_key ) elsif :invalid_table_id == recovery Channels.instance.mark_invalid_table_id( @table_id ) end return [nil, recovery] end end end [recovery, recovery] end
storage_io_block() { || ... }
click to toggle source
# File lib/logstash/outputs/application_insights/blob.rb, line 186 def storage_io_block @recovery = nil @try_count = 1 begin @client ||= Client.new( @storage_account_name, @force_client ) yield disabled = :notify == @action ? @configuration[:disable_notification] : @configuration[:disable_blob_upload] @logger.info { "Successed to #{disabled ? 'DISABLED ' : ''}#{@info}" } true rescue TypeError raise rescue StandardError => e @last_io_exception = e @recovery, reason = recover_retry?( e ) retry if @recovery || reason.nil? @recovery = reason @logger.error { "Failed to #{@info} ; retry later, error: #{e.inspect}" } false ensure @client = @client.dispose if @client end end