class LogStash::Inputs::AzureEventHubs
Attributes
count[R]
event_hubs_exploded[R]
Public Class Methods
new(params)
click to toggle source
Calls superclass method
# File lib/logstash/inputs/azure_event_hubs.rb, line 296 def initialize(params) # explode the all of the parameters to be scoped per event_hub @event_hubs_exploded = [] # global_config will be merged into the each of the exploded configs, prefer any configuration already scoped over the globally scoped config global_config = {} params.each do |k, v| if !k.eql?('id') && !k.eql?('event_hubs') && !k.eql?('threads') && !k.eql?('event_hub_connections') # don't copy these to the per-event-hub configs global_config[k] = v end end if params['config_mode'] && params['config_mode'].eql?('advanced') params['event_hub_connections'] = ['dummy'] # trick the :required validation params['event_hubs'].each do |event_hub| raise "event_hubs must be a Hash" unless event_hub.is_a?(Hash) event_hub.each do |event_hub_name, config| config.each do |k, v| if 'event_hub_connection'.eql?(k) || 'storage_connection'.eql?(k) # protect from leaking logs config[k] = ::LogStash::Util::Password.new(v) end end if config['event_hub_connection'] #add the 's' to pass validation config['event_hub_connections'] = config['event_hub_connection'] config.delete('event_hub_connection') end config.merge!({'event_hubs' => [event_hub_name]}) config.merge!(global_config) {|k, v1, v2| v1} @event_hubs_exploded << config end end else # basic config params['event_hubs'] = ['dummy'] # trick the :required validation if params['event_hub_connections'] connections = *params['event_hub_connections'] # ensure array connections.each.with_index do |_connection, i| begin connection = self.class.replace_placeholders(_connection) if self.class.respond_to? 'replace_placeholders' # 6.x connection = self.class.replace_env_placeholders(_connection) if self.class.respond_to? 'replace_env_placeholders' # 5.x event_hub_name = ConnectionStringBuilder.new(connection).getEventHubName redacted_connection = connection.gsub(/(SharedAccessKey=)([0-9a-zA-Z=+]*)([;]*)(.*)/, '\\1<redacted>\\3\\4') params['event_hub_connections'][i] = redacted_connection # protect from leaking logs raise "invalid Event Hub name" unless event_hub_name rescue raise LogStash::ConfigurationError, "Error parsing event hub string name for connection: '#{redacted_connection}' please ensure that the connection string contains the EntityPath" end @event_hubs_exploded << {'event_hubs' => [event_hub_name]}.merge({'event_hub_connections' => [::LogStash::Util::Password.new(connection)]}).merge(global_config) {|k, v1, v2| v1} end end end super(params) container_consumer_groups = [] # explicitly validate all the per event hub configs @event_hubs_exploded.each do |event_hub| if !self.class.validate(event_hub) raise LogStash::ConfigurationError, I18n.t("logstash.runner.configuration.invalid_plugin_settings") end container_consumer_groups << {event_hub['storage_connection'].value.to_s + (event_hub['storage_container'] ? event_hub['storage_container'] : event_hub['event_hubs'][0]) => event_hub['consumer_group']} if event_hub['storage_connection'] end raise "The configuration will result in overwriting offsets. Please ensure that the each Event Hub's consumer_group is using a unique storage container." if container_consumer_groups.size > container_consumer_groups.uniq.size end
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/azure_event_hubs.rb, line 364 def register # augment the exploded config with the defaults @event_hubs_exploded.each do |event_hub| @config.each do |key, value| if !key.eql?('id') && !key.eql?('event_hubs') event_hub[key] = value unless event_hub[key] end end end @logger.debug("Exploded Event Hub configuration.", :event_hubs_exploded => @event_hubs_exploded.to_s) end
run(queue)
click to toggle source
# File lib/logstash/inputs/azure_event_hubs.rb, line 376 def run(queue) event_hub_threads = [] named_thread_factory = LogStash::Inputs::Azure::NamedThreadFactory.new("azure_event_hubs-worker", @id) scheduled_executor_service = Executors.newScheduledThreadPool(@threads, named_thread_factory) @event_hubs_exploded.each do |event_hub| event_hub_threads << Thread.new do event_hub_name = event_hub['event_hubs'].first # there will always only be 1 from @event_hubs_exploded @logger.info("Event Hub #{event_hub_name} is initializing... ") begin if event_hub['storage_connection'] event_processor_host = EventProcessorHost.new( EventProcessorHost.createHostName('logstash'), event_hub_name, event_hub['consumer_group'], event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here event_hub['storage_connection'].value, event_hub.fetch('storage_container', event_hub_name), scheduled_executor_service) else @logger.warn("You have NOT specified a `storage_connection_string` for #{event_hub_name}. This configuration is only supported for a single Logstash instance.") checkpoint_manager = InMemoryCheckpointManager.new lease_manager = InMemoryLeaseManager.new event_processor_host = EventProcessorHost.new( EventProcessorHost.createHostName('logstash'), event_hub_name, event_hub['consumer_group'], event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here checkpoint_manager, lease_manager, scheduled_executor_service, nil) #using java_send to avoid naming conflicts with 'initialize' method lease_manager.java_send :initialize, [HostContext], event_processor_host.getHostContext checkpoint_manager.java_send :initialize, [HostContext], event_processor_host.getHostContext end options = EventProcessorOptions.new options.setMaxBatchSize(max_batch_size) options.setPrefetchCount(prefetch_count) options.setReceiveTimeOut(Duration.ofSeconds(receive_timeout)) options.setExceptionNotification(LogStash::Inputs::Azure::ErrorNotificationHandler.new) case @initial_position when 'beginning' msg = "Configuring Event Hub #{event_hub_name} to read events all events." @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection'] @logger.info(msg) unless event_hub['storage_connection'] options.setInitialPositionProvider(EventProcessorOptions::StartOfStreamInitialPositionProvider.new(options)) when 'end' msg = "Configuring Event Hub #{event_hub_name} to read only new events." @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection'] @logger.info(msg) unless event_hub['storage_connection'] options.setInitialPositionProvider(EventProcessorOptions::EndOfStreamInitialPositionProvider.new(options)) when 'look_back' msg = "Configuring Event Hub #{event_hub_name} to read events starting at 'now - #{@initial_position_look_back}' seconds." @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection'] @logger.info(msg) unless event_hub['storage_connection'] options.setInitialPositionProvider(LogStash::Inputs::Azure::LookBackPositionProvider.new(@initial_position_look_back)) end event_processor_host.registerEventProcessorFactory(LogStash::Inputs::Azure::ProcessorFactory.new(queue, event_hub['codec'], event_hub['checkpoint_interval'], self.method(:decorate), event_hub['decorate_events']), options) .whenComplete {|x, e| @logger.info("Event Hub registration complete. ", :event_hub_name => event_hub_name ) @logger.error("Event Hub failure while registering.", :event_hub_name => event_hub_name, :exception => e, :backtrace => e.backtrace) if e } .then_accept {|x| @logger.info("Event Hub is processing events... ", :event_hub_name => event_hub_name ) # this blocks the completable future chain from progressing, actual work is done via the executor service while !stop? Stud.stoppable_sleep(1) {stop?} end } .thenCompose {|x| @logger.info("Unregistering Event Hub this can take a while... ", :event_hub_name => event_hub_name ) event_processor_host.unregisterEventProcessor } .exceptionally {|e| @logger.error("Event Hub encountered an error.", :event_hub_name => event_hub_name , :exception => e, :backtrace => e.backtrace) if e nil } .get # this blocks till all of the futures are complete. @logger.info("Event Hub #{event_hub_name} is closed.") rescue => e @logger.error("Event Hub failed during initialization.", :event_hub_name => event_hub_name, :exception => e, :backtrace => e.backtrace) if e do_stop end end end # this blocks the input from existing. (all work is being done in threads) while !stop? Stud.stoppable_sleep(1) {stop?} end # This blocks the input till all the threads have run to completion. event_hub_threads.each do |thread| thread.join end # Ensure proper shutdown of executor service. # Note - this causes a harmless warning in the logs that scheduled tasks are being rejected. scheduled_executor_service.shutdown begin scheduled_executor_service.awaitTermination(10, TimeUnit::MINUTES); rescue => e @logger.debug("interrupted while waiting to close executor service, this can generally be ignored", :exception => e, :backtrace => e.backtrace) if e end end