class LogStash::Outputs::Application_insights::Channels
Public Class Methods
instance()
click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 139 def self.instance @@instance end
Private Class Methods
new()
click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 27 def initialize configuration = Config.current @logger = configuration[:logger] @instrumentation_key_table_id_db = {} @channels = [ ] @create_semaphore = Mutex.new @default_instrumentation_key = configuration[:instrumentation_key] @default_table_id = configuration[:table_id] @tables = configuration[:tables] end
Public Instance Methods
channel( instrumentation_key, table_id )
click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 65 def channel ( instrumentation_key, table_id ) begin dispatch_channel( instrumentation_key, table_id ) rescue NoChannelError begin create_channel( instrumentation_key, table_id ) rescue ChannelExistError # can happen due to race conditions dispatch_channel( instrumentation_key, table_id ) end end end
close()
click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 121 def close @channels.each do |channel| channel.close end end
mark_invalid_instrumentation_key( instrumentation_key )
click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 127 def mark_invalid_instrumentation_key ( instrumentation_key ) # TODO should go to lost and found container end
mark_invalid_table_id( table_id )
click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 131 def mark_invalid_table_id ( table_id ) # TODO should go to lost and found container end
periodic_forward_events()
click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 79 def periodic_forward_events Thread.new do loop do sleep( 0.5 ) channels = @create_semaphore.synchronize { @channels.dup } channels.each do |channel| channel.flush end end end end
receive( event, encoded_event )
click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 48 def receive ( event, encoded_event ) if LogStash::SHUTDOWN == event @logger.info { "received a LogStash::SHUTDOWN event" } elsif LogStash::FLUSH == event @logger.info { "received a LogStash::FLUSH event" } else data = event.to_hash table_id = ( event.include?( METADATA_FIELD_TABLE_ID ) ? event.sprintf( "%{#{METADATA_FIELD_TABLE_ID}}" ) : data[FIELD_TABLE_ID] ) || @default_table_id instrumentation_key = ( event.include?( METADATA_FIELD_INSTRUMENTATION_KEY ) ? event.sprintf( "%{#{METADATA_FIELD_INSTRUMENTATION_KEY}}" ) : data[FIELD_INSTRUMENTATION_KEY] ) || @default_instrumentation_key @flow_control.pass_or_wait channel( instrumentation_key, table_id ) << data end end
start()
click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 41 def start @flow_control = Flow_control.instance # launch tread that forward events from channels to azure storage periodic_forward_events end
Private Instance Methods
create_channel( instrumentation_key, table_id )
click to toggle source
return channel
# File lib/logstash/outputs/application_insights/channels.rb, line 108 def create_channel ( instrumentation_key, table_id ) @create_semaphore.synchronize { raise ChannelExistError if @instrumentation_key_table_id_db[instrumentation_key] && @instrumentation_key_table_id_db[instrumentation_key][table_id] @instrumentation_key_table_id_db[instrumentation_key] ||= {} channel = Channel.new( instrumentation_key, table_id ) @instrumentation_key_table_id_db[instrumentation_key][table_id] = channel @channels << channel channel } end
dispatch_channel( instrumentation_key, table_id )
click to toggle source
return channel
# File lib/logstash/outputs/application_insights/channels.rb, line 94 def dispatch_channel ( instrumentation_key, table_id ) begin channel = @instrumentation_key_table_id_db[instrumentation_key][table_id] channel.instrumentation_key # don't remove it, it is to emit an exception in case channel not created yet' channel rescue => e raise NoChannelError if @instrumentation_key_table_id_db[instrumentation_key].nil? || @instrumentation_key_table_id_db[instrumentation_key][table_id].nil? @logger.error { "Channel dispatch failed - error: #{e.inspect}" } raise e end end