class LogStash::Inputs::AzureEventHubs

Attributes

count[R]
event_hubs_exploded[R]

Public Class Methods

new(params) click to toggle source
Calls superclass method
# File lib/logstash/inputs/azure_event_hubs.rb, line 296
def initialize(params)

  # explode the all of the parameters to be scoped per event_hub
  @event_hubs_exploded = []
  # global_config will be merged into the each of the exploded configs, prefer any configuration already scoped over the globally scoped config
  global_config = {}
  params.each do |k, v|
    if !k.eql?('id') && !k.eql?('event_hubs') && !k.eql?('threads') && !k.eql?('event_hub_connections')  # don't copy these to the per-event-hub configs
      global_config[k] = v
    end
  end

  if params['config_mode'] && params['config_mode'].eql?('advanced')
    params['event_hub_connections'] = ['dummy'] # trick the :required validation

    params['event_hubs'].each do |event_hub|
      raise "event_hubs must be a Hash" unless event_hub.is_a?(Hash)
      event_hub.each do |event_hub_name, config|
        config.each do |k, v|
          if 'event_hub_connection'.eql?(k) || 'storage_connection'.eql?(k) # protect from leaking logs
            config[k] = ::LogStash::Util::Password.new(v)
          end
        end
        if config['event_hub_connection'] #add the 's' to pass validation
          config['event_hub_connections'] = config['event_hub_connection']
          config.delete('event_hub_connection')
        end

        config.merge!({'event_hubs' => [event_hub_name]})
        config.merge!(global_config) {|k, v1, v2| v1}
        @event_hubs_exploded << config
      end
    end
  else # basic config
    params['event_hubs'] = ['dummy'] # trick the :required validation
    if params['event_hub_connections']
      connections = *params['event_hub_connections'] # ensure array
      connections.each.with_index do |_connection, i|
        begin
          connection = self.class.replace_placeholders(_connection) if self.class.respond_to? 'replace_placeholders' # 6.x
          connection = self.class.replace_env_placeholders(_connection) if self.class.respond_to? 'replace_env_placeholders' # 5.x
          event_hub_name = ConnectionStringBuilder.new(connection).getEventHubName
          redacted_connection = connection.gsub(/(SharedAccessKey=)([0-9a-zA-Z=+]*)([;]*)(.*)/, '\\1<redacted>\\3\\4')
          params['event_hub_connections'][i] = redacted_connection # protect from leaking logs
          raise "invalid Event Hub name" unless event_hub_name
        rescue
          raise LogStash::ConfigurationError, "Error parsing event hub string name for connection: '#{redacted_connection}' please ensure that the connection string contains the EntityPath"
        end
        @event_hubs_exploded << {'event_hubs' => [event_hub_name]}.merge({'event_hub_connections' => [::LogStash::Util::Password.new(connection)]}).merge(global_config) {|k, v1, v2| v1}
      end
    end
  end

  super(params)

  container_consumer_groups = []
  # explicitly validate all the per event hub configs
  @event_hubs_exploded.each do |event_hub|
    if !self.class.validate(event_hub)
      raise LogStash::ConfigurationError, I18n.t("logstash.runner.configuration.invalid_plugin_settings")
    end
    container_consumer_groups << {event_hub['storage_connection'].value.to_s + (event_hub['storage_container'] ? event_hub['storage_container'] : event_hub['event_hubs'][0]) => event_hub['consumer_group']} if event_hub['storage_connection']
  end
  raise "The configuration will result in overwriting offsets. Please ensure that the each Event Hub's consumer_group is using a unique storage container." if container_consumer_groups.size > container_consumer_groups.uniq.size
end

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/azure_event_hubs.rb, line 364
def register
  # augment the exploded config with the defaults
  @event_hubs_exploded.each do |event_hub|
    @config.each do |key, value|
      if !key.eql?('id') && !key.eql?('event_hubs')
        event_hub[key] = value unless event_hub[key]
      end
    end
  end
  @logger.debug("Exploded Event Hub configuration.",  :event_hubs_exploded => @event_hubs_exploded.to_s)
end
run(queue) click to toggle source
# File lib/logstash/inputs/azure_event_hubs.rb, line 376
def run(queue)
  event_hub_threads = []
  named_thread_factory = LogStash::Inputs::Azure::NamedThreadFactory.new("azure_event_hubs-worker", @id)
  scheduled_executor_service = Executors.newScheduledThreadPool(@threads, named_thread_factory)
  @event_hubs_exploded.each do |event_hub|
    event_hub_threads << Thread.new do
      event_hub_name = event_hub['event_hubs'].first # there will always only be 1 from @event_hubs_exploded
      @logger.info("Event Hub #{event_hub_name} is initializing... ")
      begin
        if event_hub['storage_connection']
          event_processor_host = EventProcessorHost.new(
              EventProcessorHost.createHostName('logstash'),
              event_hub_name,
              event_hub['consumer_group'],
              event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
              event_hub['storage_connection'].value,
              event_hub.fetch('storage_container', event_hub_name),
              scheduled_executor_service)
        else
          @logger.warn("You have NOT specified a `storage_connection_string` for #{event_hub_name}. This configuration is only supported for a single Logstash instance.")
          checkpoint_manager = InMemoryCheckpointManager.new
          lease_manager = InMemoryLeaseManager.new
          event_processor_host = EventProcessorHost.new(
              EventProcessorHost.createHostName('logstash'),
              event_hub_name,
              event_hub['consumer_group'],
              event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
              checkpoint_manager,
              lease_manager,
              scheduled_executor_service,
              nil)
          #using java_send to avoid naming conflicts with 'initialize' method
          lease_manager.java_send :initialize, [HostContext], event_processor_host.getHostContext
          checkpoint_manager.java_send :initialize, [HostContext], event_processor_host.getHostContext
        end
        options = EventProcessorOptions.new
        options.setMaxBatchSize(max_batch_size)
        options.setPrefetchCount(prefetch_count)
        options.setReceiveTimeOut(Duration.ofSeconds(receive_timeout))
        
        options.setExceptionNotification(LogStash::Inputs::Azure::ErrorNotificationHandler.new)
        case @initial_position
        when 'beginning'
          msg = "Configuring Event Hub #{event_hub_name} to read events all events."
          @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
          @logger.info(msg) unless event_hub['storage_connection']
          options.setInitialPositionProvider(EventProcessorOptions::StartOfStreamInitialPositionProvider.new(options))
        when 'end'
          msg = "Configuring Event Hub #{event_hub_name} to read only new events."
          @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
          @logger.info(msg) unless event_hub['storage_connection']
          options.setInitialPositionProvider(EventProcessorOptions::EndOfStreamInitialPositionProvider.new(options))
        when 'look_back'
          msg = "Configuring Event Hub #{event_hub_name} to read events starting at 'now - #{@initial_position_look_back}' seconds."
          @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
          @logger.info(msg) unless event_hub['storage_connection']
          options.setInitialPositionProvider(LogStash::Inputs::Azure::LookBackPositionProvider.new(@initial_position_look_back))
        end
        event_processor_host.registerEventProcessorFactory(LogStash::Inputs::Azure::ProcessorFactory.new(queue, event_hub['codec'], event_hub['checkpoint_interval'], self.method(:decorate), event_hub['decorate_events']), options)
            .whenComplete {|x, e|
              @logger.info("Event Hub registration complete. ", :event_hub_name => event_hub_name )
              @logger.error("Event Hub failure while registering.", :event_hub_name => event_hub_name, :exception => e, :backtrace => e.backtrace) if e
            }
            .then_accept {|x|
              @logger.info("Event Hub is processing events... ", :event_hub_name => event_hub_name )
              # this blocks the completable future chain from progressing, actual work is done via the executor service
              while !stop?
                Stud.stoppable_sleep(1) {stop?}
              end
            }
            .thenCompose {|x|
              @logger.info("Unregistering Event Hub this can take a while... ", :event_hub_name => event_hub_name )
              event_processor_host.unregisterEventProcessor
            }
            .exceptionally {|e|
              @logger.error("Event Hub encountered an error.", :event_hub_name => event_hub_name , :exception => e, :backtrace => e.backtrace) if e
              nil
            }
            .get # this blocks till all of the futures are complete.
        @logger.info("Event Hub #{event_hub_name} is closed.")
      rescue => e
        @logger.error("Event Hub failed during initialization.", :event_hub_name => event_hub_name, :exception => e, :backtrace => e.backtrace) if e
        do_stop
      end
    end
  end

  # this blocks the input from existing. (all work is being done in threads)
  while !stop?
    Stud.stoppable_sleep(1) {stop?}
  end

  # This blocks the input till all the threads have run to completion.
  event_hub_threads.each do |thread|
    thread.join
  end

  # Ensure proper shutdown of executor service. # Note - this causes a harmless warning in the logs that scheduled tasks are being rejected.
  scheduled_executor_service.shutdown
  begin
    scheduled_executor_service.awaitTermination(10, TimeUnit::MINUTES);
  rescue => e
    @logger.debug("interrupted while waiting to close executor service, this can generally be ignored", :exception => e, :backtrace => e.backtrace) if e
  end
end