class LogStash::Outputs::Application_insights::Channels

Public Class Methods

instance() click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 139
def self.instance
  @@instance
end

Private Class Methods

new() click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 27
def initialize
  configuration = Config.current

  @logger = configuration[:logger]

  @instrumentation_key_table_id_db = {}
  @channels = [  ]
  @create_semaphore = Mutex.new

  @default_instrumentation_key = configuration[:instrumentation_key]
  @default_table_id = configuration[:table_id]
  @tables = configuration[:tables]
end

Public Instance Methods

channel( instrumentation_key, table_id ) click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 65
def channel ( instrumentation_key, table_id )
  begin
    dispatch_channel( instrumentation_key, table_id )

  rescue NoChannelError
    begin
      create_channel( instrumentation_key, table_id )
    rescue ChannelExistError # can happen due to race conditions
      dispatch_channel( instrumentation_key, table_id )
    end
  end
end
close() click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 121
def close
  @channels.each do |channel|
    channel.close
  end
end
mark_invalid_instrumentation_key( instrumentation_key ) click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 127
def mark_invalid_instrumentation_key ( instrumentation_key )
  # TODO should go to lost and found container
end
mark_invalid_table_id( table_id ) click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 131
def mark_invalid_table_id ( table_id )
  # TODO should go to lost and found container
end
periodic_forward_events() click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 79
def periodic_forward_events
  Thread.new do
    loop do
      sleep( 0.5 )
      channels = @create_semaphore.synchronize { @channels.dup }
      channels.each do |channel|
        channel.flush
      end
    end
  end
end
receive( event, encoded_event ) click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 48
def receive ( event, encoded_event )
  if LogStash::SHUTDOWN == event
    @logger.info { "received a LogStash::SHUTDOWN event" }

  elsif LogStash::FLUSH == event
    @logger.info { "received a LogStash::FLUSH event" }
  else
    data = event.to_hash
    table_id = ( event.include?( METADATA_FIELD_TABLE_ID ) ? event.sprintf( "%{#{METADATA_FIELD_TABLE_ID}}" ) : data[FIELD_TABLE_ID] ) || @default_table_id
    instrumentation_key = ( event.include?( METADATA_FIELD_INSTRUMENTATION_KEY ) ? event.sprintf( "%{#{METADATA_FIELD_INSTRUMENTATION_KEY}}" ) : data[FIELD_INSTRUMENTATION_KEY] ) || @default_instrumentation_key

    @flow_control.pass_or_wait
    channel( instrumentation_key, table_id ) << data
  end
end
start() click to toggle source
# File lib/logstash/outputs/application_insights/channels.rb, line 41
def start
  @flow_control = Flow_control.instance
  # launch tread that forward events from channels to azure storage
  periodic_forward_events
end

Private Instance Methods

create_channel( instrumentation_key, table_id ) click to toggle source

return channel

# File lib/logstash/outputs/application_insights/channels.rb, line 108
def create_channel ( instrumentation_key, table_id )
  @create_semaphore.synchronize {      
    raise ChannelExistError if @instrumentation_key_table_id_db[instrumentation_key] && @instrumentation_key_table_id_db[instrumentation_key][table_id]
    @instrumentation_key_table_id_db[instrumentation_key] ||= {}
    channel = Channel.new( instrumentation_key, table_id )
    @instrumentation_key_table_id_db[instrumentation_key][table_id] = channel
    @channels << channel
    channel
  }
end
dispatch_channel( instrumentation_key, table_id ) click to toggle source

return channel

# File lib/logstash/outputs/application_insights/channels.rb, line 94
def dispatch_channel ( instrumentation_key, table_id )
  begin
    channel = @instrumentation_key_table_id_db[instrumentation_key][table_id]
    channel.instrumentation_key     # don't remove it, it is to emit an exception in case channel not created yet'
    channel
  rescue => e
    raise NoChannelError if @instrumentation_key_table_id_db[instrumentation_key].nil? || @instrumentation_key_table_id_db[instrumentation_key][table_id].nil?
    @logger.error { "Channel dispatch failed - error: #{e.inspect}" }
    raise e
  end 
end