class LogStash::Outputs::Application_insights::Upload_pipe
Public Class Methods
new( channel = nil, id = nil, tuple = nil )
click to toggle source
Calls superclass method
LogStash::Outputs::Application_insights::Blob::new
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 25 def initialize ( channel = nil, id = nil, tuple = nil ) # super first parameter must be nil. blob first parameter is channel, otherwise it will pass storage_account_name as channel super( tuple ) @channel = channel if @channel @id = id @instrumentation_key = @channel.instrumentation_key @table_id = @channel.table_id @blob_max_delay = @channel.blob_max_delay @blob_extension = @channel.blob_extension @event_format = @channel.event_format @file_pipe = @channel.file_pipe? @io_queue = Queue.new # create a thread that handles the IO of the blob if @file_pipe launch_file_pipe_thread else launch_block_pipe_thread end end end
Public Instance Methods
<<( block )
click to toggle source
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 150 def << ( block ) @io_queue << block end
busy?()
click to toggle source
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 146 def busy? 0 < @io_queue.length || 0 == @io_queue.num_waiting end
close()
click to toggle source
close blob. It will finish whatever was already on the queue, and if necessary commit called on shutdown
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 137 def close @io_queue << :close end
commit()
click to toggle source
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 155 def commit unless @uploaded_block_ids.empty? @action = :commit @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable ] success = storage_io_block { @info = "#{@action.to_s} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}" # assume that exceptions can be raised due to this method: @client.blobClient.commit_blob_blocks( @container_name, @blob_name, @uploaded_block_ids ) unless @configuration[:disable_blob_upload] @log_state = :committed } if success # next stage state_table_update else @storage_recovery.recover_later( context_to_tuple, :commit, @storage_account_name ) end end end
launch_block_pipe_thread()
click to toggle source
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 52 def launch_block_pipe_thread Thread.new do timer = Timer.new next_block = nil loop do block_to_upload = nil # release reference to resource for GC block_to_upload = next_block || @io_queue.pop next_block = nil if :trigger == timer.state next_block = block_to_upload unless :wakeup == block_to_upload block_to_upload = :timeout to_commit = :commit elsif :close == block_to_upload to_commit = :commit # ignore :trigger as they are only to casue check timeout elsif :wakeup == block_to_upload # ignore :wakeup next else while @io_queue.length > 0 next_block = @io_queue.pop next if :wakeup == next_block # ignore :wakeup break if :close == next_block break if blob_full?( next_block ) break unless block_to_upload.concat( next_block ) next_block = nil end end unless to_commit timer.set( block_to_upload.oldest_event_time + @blob_max_delay, nil ) {|object| @io_queue << :wakeup if 0 == @io_queue.length } if blob_empty? upload( block_to_upload ) block_to_upload = nil # release reference to resource for GC to_commit = :commit if blob_full? end if to_commit commit unless @uploaded_block_ids.empty? to_commit = nil @uploaded_block_ids = [ ] timer.cancel break if :close == block_to_upload end end end end
launch_file_pipe_thread()
click to toggle source
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 103 def launch_file_pipe_thread Thread.new do loop do file_to_upload = @io_queue.pop break if :close == file_to_upload file_to_upload.open_read @file_size = file_to_upload.file_size while block = file_to_upload.get_next_block unless upload( block ) # start the file from the begining file_to_upload.close_read @channel.recover_later_file_upload( file_to_upload ) file_to_upload = nil break end end if file_to_upload commit unless @uploaded_block_ids.empty? file_to_upload.dispose file_to_upload = nil end @uploaded_block_ids = [ ] end end end
queue_size()
click to toggle source
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 142 def queue_size @io_queue.length end
Private Instance Methods
blob_empty?()
click to toggle source
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 184 def blob_empty? @uploaded_block_ids.empty? end
blob_full?( next_block = nil )
click to toggle source
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 176 def blob_full? ( next_block = nil ) if next_block BLOB_MAX_BLOCKS < @uploaded_block_ids.length + 1 || @configuration[:blob_max_events] < @uploaded_events_count + next_block.events_count || @configuration[:blob_max_bytesize] < @uploaded_bytesize + next_block.bytesize else BLOB_MAX_BLOCKS <= @uploaded_block_ids.length || @configuration[:blob_max_events] <= @uploaded_events_count || @configuration[:blob_max_bytesize] <= @uploaded_bytesize end end
set_conatainer_and_blob_names()
click to toggle source
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 283 def set_conatainer_and_blob_names time_utc = Time.now.utc id = @id.to_s.rjust(4, "0") strtime = time_utc.strftime( "%F" ) @container_name = "#{AZURE_STORAGE_CONTAINER_LOGSTASH_PREFIX}#{@configuration[:azure_storage_container_prefix]}-#{strtime}" strtime = time_utc.strftime( "%F-%H-%M-%S-%L" ) @blob_name = "#{AZURE_STORAGE_BLOB_LOGSTASH_PREFIX}#{@configuration[:azure_storage_blob_prefix]}/ikey-#{@instrumentation_key}/table-#{@table_id}/#{strtime}_#{id}#{@blob_extension}" end
upload( block )
click to toggle source
returns success / failure
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 217 def upload ( block ) @storage_account_name = nil if @uploaded_block_ids.empty? @block_to_upload = block block = nil # remove reference for GC exclude_storage_account_names = [ ] success = false begin if @uploaded_block_ids.empty? @log_state = :uploading @uploaded_block_numbers = [ ] @uploaded_bytesize = 0 @uploaded_events_count = 0 @oldest_event_time = nil # remove "loading" record from state table of first block upload that failed, we will try on alternative storage if @storage_account_name exclude_storage_account_names << @storage_account_name @storage_recovery.recover_later( context_to_tuple, :state_table_update, @storage_account_name ) end set_conatainer_and_blob_names @storage_account_name = Clients.instance.get_random_active_storage( exclude_storage_account_names ) unless @storage_account_name @recovery = :io_all_dead upload_retry_later return false end raise UploadRetryError unless state_table_insert end @action = :upload @block_info = "blocks: #{@block_to_upload.block_numbers}, events: #{@block_to_upload.events_count}, size: #{@block_to_upload.bytes.length}" @info = "#{@action} #{@storage_account_name}/#{@container_name}/#{@blob_name}, #{@block_info}, commitId: [\"#{100001 + @uploaded_block_ids.length}\"]" @recoverable = [ :invalid_storage_key, :invalid_storage_account, :io_failure, :service_unavailable, :container_exist, :create_container ] success = storage_io_block { create_container_exist_recovery block_id = "#{100001 + @uploaded_block_ids.length}" # assume that exceptions can be raised due to this method: @client.blobClient.put_blob_block( @container_name, @blob_name, block_id, @block_to_upload.bytes ) unless @configuration[:disable_blob_upload] # upload success first_block_in_blob = @uploaded_block_ids.empty? @uploaded_block_ids << [ block_id ] @uploaded_block_numbers.concat( @block_to_upload.block_numbers ) @uploaded_bytesize += @block_to_upload.bytes.length @uploaded_events_count += @block_to_upload.events_count @oldest_event_time ||= @block_to_upload.oldest_event_time # release memory bytesize = @block_to_upload.bytesize @block_to_upload.dispose @block_to_upload = nil State.instance.inc_pending_commits if first_block_in_blob State.instance.dec_upload_bytesize( bytesize ) } upload_retry_later unless success rescue UploadRetryError @recovery = nil retry end success end
upload_retry_later()
click to toggle source
# File lib/logstash/outputs/application_insights/upload_pipe.rb, line 189 def upload_retry_later unless @uploaded_block_ids.empty? if @file_pipe # remove "loading" record from state table of all previous blocks uploaded , we will try the whole file on an alternative storage @storage_recovery.recover_later( context_to_tuple, :state_table_update, @storage_account_name ) # memory is decrmeneted because the retry is done from the begining of the file bytesize = @block_to_upload.bytesize @block_to_upload.dispose @block_to_upload = nil State.instance.dec_upload_bytesize( bytesize ) return else info1 = "#{:commit} #{@storage_account_name}/#{@container_name}/#{@blob_name}, events: #{@uploaded_events_count}, size: #{@uploaded_bytesize}, blocks: #{@uploaded_block_numbers}, delay: #{Time.now.utc - @oldest_event_time}" @logger.error { "Pospone to #{info1} (; retry later, error: #{@last_io_exception.inspect}" } @storage_recovery.recover_later( context_to_tuple, :commit, @storage_account_name ) @uploaded_block_ids = [ ] end end if :io_all_dead == @recovery @channel.recover_later_block_upload( @block_to_upload ) unless @file_pipe @block_to_upload = nil else raise UploadRetryError end end