class LogStash::Outputs::Application_insights::Channel
Attributes
blob_extension[R]
blob_max_delay[R]
event_format[R]
instrumentation_key[R]
table_id[R]
Public Class Methods
new( instrumentation_key, table_id )
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 33 def initialize ( instrumentation_key, table_id ) @closing = false configuration = Config.current @disable_truncation = configuration[:disable_truncation] @file_pipe = !configuration[:disable_compression] @gzip_file = !configuration[:disable_compression] @blob_max_bytesize = configuration[:blob_max_bytesize] @blob_max_events = configuration[:blob_max_events] @logger = configuration[:logger] @logger.debug { "Create a new channel, instrumentation_key / table_id : #{instrumentation_key} / #{table_id}" } @instrumentation_key = instrumentation_key @table_id = table_id set_table_properties( configuration ) @semaphore = Mutex.new @workers_channel = { } @failed_on_notify_retry_Q = Queue.new launch_notify_recovery_thread @blob_extension = ".#{@event_format}" if file_pipe? @blob_extension = "_#{@event_format}.gz" if gzip_file? @add_pipe_threshold = 0 @file_prefix = configuration[:local_file_prefix] @file = nil @failed_on_file_upload_retry_Q = Queue.new launch_file_upload_recovery_thread else @add_pipe_threshold = CHANNEL_THRESHOLD_TO_ADD_UPLOAD_PIPE @failed_on_block_upload_retry_Q = Queue.new launch_block_upload_recovery_thread end @active_upload_pipes = [ Upload_pipe.new( self, 1 ) ] end
Public Instance Methods
<<( data )
click to toggle source
received data is an hash of the event (does not include metadata)
# File lib/logstash/outputs/application_insights/channel.rb, line 93 def << ( data ) if @serialized_event_field && data[@serialized_event_field] serialized_event = serialize_serialized_event_field( data[@serialized_event_field] ) else serialized_event = ( EXT_EVENT_FORMAT_CSV == @event_format ? serialize_to_csv( data ) : serialize_to_json( data ) ) end if serialized_event sub_channel = @workers_channel[Thread.current] || @semaphore.synchronize { @workers_channel[Thread.current] = Sub_channel.new( @event_separator ) } sub_channel << serialized_event else @logger.warn { "event not uploaded, no relevant data in event. table_id: #{@table_id}, event: #{data}" } end end
close()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 81 def close @closing = true @active_upload_pipes.each do |upload_pipe| upload_pipe.close end end
file_pipe?()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 77 def file_pipe? @file_pipe end
flush()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 109 def flush if file_pipe? gz_collect_and_compress_blocks_to_file if file_expired_or_full? enqueue_to_pipe( [ @file ] ) @file = nil end else list = collect_blocks enqueue_to_pipe( list ) end end
gzip_file?()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 73 def gzip_file? @gzip_file end
recover_later_block_upload( block_to_upload )
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 128 def recover_later_block_upload( block_to_upload ) @failed_on_block_upload_retry_Q << block_to_upload end
recover_later_file_upload( file_to_upload )
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 132 def recover_later_file_upload( file_to_upload ) @failed_on_file_upload_retry_Q << file_to_upload end
recover_later_notification( tuple )
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 123 def recover_later_notification( tuple ) @failed_on_notify_retry_Q << tuple end
stopped?()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 88 def stopped? @closing end
Private Instance Methods
collect_blocks()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 186 def collect_blocks workers_channel = @semaphore.synchronize { @workers_channel.dup } full_block_list = [ ] prev_last_block = nil workers_channel.each_value do |worker_channel| block_list = worker_channel.get_block_list! unless block_list.empty? last_block = block_list.pop full_block_list.concat( block_list ) if prev_last_block unless prev_last_block.concat( last_block ) full_block_list << prev_last_block prev_last_block = last_block end else prev_last_block = last_block end end end full_block_list << prev_last_block if prev_last_block full_block_list end
enqueue_to_pipe( list )
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 211 def enqueue_to_pipe ( list ) list.each do |block_or_file| block_or_file.seal find_upload_pipe << block_or_file end end
file_expired_or_full?()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 150 def file_expired_or_full? @file && ( @file.oldest_event_time + @blob_max_delay <= Time.now.utc || @file.bytesize >= @blob_max_bytesize || @file.events_count >= @blob_max_events ) end
find_upload_pipe()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 394 def find_upload_pipe min_upload_pipe = @active_upload_pipes[0] @active_upload_pipes.each do |upload_pipe| return upload_pipe unless min_upload_pipe.busy? min_upload_pipe = upload_pipe if upload_pipe.queue_size < min_upload_pipe.queue_size end @active_upload_pipes << ( min_upload_pipe = Upload_pipe.new( self, @active_upload_pipes.length + 1 ) ) if min_upload_pipe.busy? && min_upload_pipe.queue_size >= @add_pipe_threshold && @active_upload_pipes.length < MAX_CHANNEL_UPLOAD_PIPES min_upload_pipe end
gz_collect_and_compress_blocks_to_file()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 155 def gz_collect_and_compress_blocks_to_file workers_channel = @semaphore.synchronize { @workers_channel.dup } full_block_list = [ ] workers_channel.each_value do |worker_channel| full_block_list.concat( worker_channel.get_block_list! ) end full_block_list.each do |block| block.partial_seal local_file << block end end
launch_block_upload_recovery_thread()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 219 def launch_block_upload_recovery_thread #recovery thread Thread.new do loop do block_to_upload = @failed_on_block_upload_retry_Q.pop until Clients.instance.storage_account_state_on? do Stud.stoppable_sleep( 60 ) { stopped? } end if block_to_upload enqueue_to_pipe( [ block_to_upload ] ) end end end end
launch_file_upload_recovery_thread()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 170 def launch_file_upload_recovery_thread #recovery thread Thread.new do loop do file_to_upload = @failed_on_file_upload_retry_Q.pop until Clients.instance.storage_account_state_on? do Stud.stoppable_sleep( 60 ) { stopped? } end if file_to_upload enqueue_to_pipe( [ file_to_upload ] ) end end end end
launch_notify_recovery_thread()
click to toggle source
thread that failed to notify due to Application Insights error, such as wrong key or wrong schema
# File lib/logstash/outputs/application_insights/channel.rb, line 236 def launch_notify_recovery_thread #recovery thread Thread.new do loop do tuple = @failed_on_notify_retry_Q.pop begin Stud.stoppable_sleep( 60 ) { stopped? } end until Clients.instance.storage_account_state_on? || stopped? if stopped? @state ||= State.instance @state.dec_pending_notifications @shutdown ||= Shutdown.instance @shutdown.display_msg("!!! notification won't recover in this session due to shutdown") else success = Notification.new( tuple ).notify while success && @failed_on_notify_retry_Q.length > 0 tuple = @failed_on_notify_retry_Q.pop success = Notification.new( tuple ).notify end end tuple = nil # release for GC end end end
local_file()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 145 def local_file @file ||= Local_file.new( local_file_name, gzip_file? ) end
local_file_name()
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 138 def local_file_name time_utc = Time.now.utc strtime = Time.now.utc.strftime( "%F-%H-%M-%S-%L" ) "#{@file_prefix}_ikey-#{@instrumentation_key}_table-#{@table_id}_#{strtime}#{@blob_extension}" end
serialize_array_to_csv( csv_array )
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 323 def serialize_array_to_csv ( csv_array ) return nil if csv_array.empty? csv_string = csv_array.to_csv( :col_sep => @csv_separator ) return csv_string if csv_string.bytesize < MAX_FIELD_BYTES || @disable_truncation index = 0 csv_array.map! do |value| index += 1 truncate_data_if_too_big( index.to_s, value ) end csv_array.to_csv( :col_sep => @csv_separator ) end
serialize_serialized_event_field( data )
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 263 def serialize_serialized_event_field ( data ) serialized_data = nil if data.is_a?( String ) serialized_data = data elsif EXT_EVENT_FORMAT_CSV == @event_format if data.is_a?( Array ) serialized_data = serialize_array_to_csv( data ) elsif data.is_a?( Hash ) serialized_data = serialize_to_csv( data ) end elsif EXT_EVENT_FORMAT_JSON == @event_format if data.is_a?( Hash ) serialized_data = serialize_to_json( data ) elsif data.is_a?( Array ) && !@table_columns.nil? serialized_data = serialize_to_json( Hash[@table_columns.map {|column| column[:name]}.zip( data )] ) end end serialized_data end
serialize_to_csv( data )
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 309 def serialize_to_csv ( data ) return nil unless !@table_columns.nil? data = Utils.downcase_hash_keys( data ) if @case_insensitive_columns csv_array = [ ] @table_columns.each do |column| value = data[column[:field_name]] || column[:default] || @csv_default_value type = (column[:type] || value.class.name).downcase.to_sym csv_array << ( [:hash, :array, :json, :dynamic, :object].include?( type ) ? value.to_json : value ) end serialize_array_to_csv( csv_array ) end
serialize_to_json( data )
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 284 def serialize_to_json ( data ) if (@table_columns.nil?) json_hash = data else data = Utils.downcase_hash_keys( data ) if @case_insensitive_columns json_hash = { } @table_columns.each do |column| value = data[column[:field_name]] || column[:default] json_hash[column[:name]] = value if value end end return nil if json_hash.empty? json_string = json_hash.to_json return json_string if json_string.bytesize < MAX_FIELD_BYTES || @disable_truncation json_hash.each_pair do |name, value| json_hash[name] = truncate_data_if_too_big( name, value ) end json_hash.to_json end
set_table_properties( configuration )
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 405 def set_table_properties ( configuration ) table_properties = configuration[:tables][@table_id] if table_properties @blob_max_delay = table_properties[:blob_max_delay] @event_separator = table_properties[:event_separator] @serialized_event_field = table_properties[:serialized_event_field] @table_columns = table_properties[:table_columns] @event_format = table_properties[:blob_serialization] @case_insensitive_columns = table_properties[:case_insensitive_columns] @csv_default_value = table_properties[:csv_default_value] @csv_separator = table_properties[:csv_separator] end @blob_max_delay ||= configuration[:blob_max_delay] @event_separator ||= configuration[:event_separator] @serialized_event_field ||= configuration[:serialized_event_field] @table_columns ||= configuration[:table_columns] @event_format ||= configuration[:blob_serialization] @case_insensitive_columns ||= configuration[:case_insensitive_columns] @csv_default_value ||= configuration[:csv_default_value] @csv_separator ||= configuration[:csv_separator] # add field_name to each column, it is required to differentiate between the filed name and the column name unless @table_columns.nil? @table_columns = @table_columns.map do |column| new_column = column.dup new_column[:field_name] = ( @case_insensitive_columns ? new_column[:name].downcase : new_column[:name] ) new_column end end end
truncate_data_if_too_big( name, data )
click to toggle source
# File lib/logstash/outputs/application_insights/channel.rb, line 337 def truncate_data_if_too_big ( name, data ) return data if @disable_truncation truncated = nil if data.is_a?( String ) if data.bytesize > MAX_FIELD_BYTES truncated = data.bytesize - MAX_FIELD_BYTES data = data.byteslice( 0, MAX_FIELD_BYTES ) end elsif data.is_a?( Hash ) str = data.to_json while str.bytesize > MAX_FIELD_BYTES truncated = str.bytesize - MAX_FIELD_BYTES unless truncated delta = str.bytesize - MAX_FIELD_BYTES max_size = 0 max_name = nil data.each_pair do |name, value| if value.is_a?( String ) && value.bytesize > max_size max_name = name max_size = value.bytesize end end unless max_name data = {} break end data[max_name] = data[max_name].byteslice( 0, max_size - ( max_size > delta ? delta : max_size ) ) str = data.to_json end elsif data.is_a?( Array ) str = data.to_json while str.bytesize > MAX_FIELD_BYTES truncated = str.bytesize - MAX_FIELD_BYTES unless truncated delta = str.bytesize - MAX_FIELD_BYTES max_size = 0 max_index = nil data.each_index do |index| value = data[index] if value.is_a?( String ) && value.bytesize > max_size max_index = index max_size = value.bytesize end end unless max_index data = [] break end data[max_index] = data[max_index].byteslice( 0, max_size - ( max_size > delta ? delta : max_size ) ) str = data.to_json end end @logger.warn { "field #{name} was truncated by #{truncated} bytes, due to size above #{MAX_FIELD_BYTES} bytes. table_id: #{@table_id}" } if truncated data end