class LogStash::Outputs::AzureEventHubs
Output plugin to send events to an Azure Event Hub.
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/azure_event_hubs.rb, line 137 def close @eventhub_client.closeSync(); @executor_service.shutdown(); end
receive(event)
click to toggle source
# File lib/logstash/outputs/azure_event_hubs.rb, line 143 def receive(event) begin @codec.encode(event) rescue => e @logger.warn("Error encoding event", :exception => e, :event => event) end end
register()
click to toggle source
# File lib/logstash/outputs/azure_event_hubs.rb, line 78 def register # Build Event Hubs client connection string connection_builder = ConnectionStringBuilder.new() connection_builder.setNamespaceName(@service_namespace) connection_builder.setEventHubName(@event_hub) connection_builder.setSasKeyName(@sas_key_name) connection_builder.setSasKey(@sas_key) # Configure for national cloud if (!@service_domain.nil?) connection_builder.setEndpoint(@service_namespace, @service_domain) end # The Executor handles all the asynchronous tasks and this is passed to the EventHubClient. # The gives the user control to segregate their thread pool based on the work load. # This pool can then be shared across multiple EventHubClient instances. @executor_service = Executors.newScheduledThreadPool(@client_threads) # Handle Transient errors when creating the Event Hubs Client try = 0 retry_interval = 2 begin # Each EventHubClient instance spins up a new TCP/SSL connection, which is expensive. # It is always a best practice to reuse these instances. @eventhub_client = EventHubClient.createSync(connection_builder.toString(), @executor_service) rescue EventHubException, ExecutionException, InterruptedException, IOException => e # Log error, no retry if e.getIsTransient() != true or e == IOException or try >= @connection_retry_count @logger.error( "Unable to establish connection to Azure Event Hubs.", :error_message => e.getMessage(), :class => e.class.name ) close() exit(1) end # Log error with retry @logger.error( "Connection to Event Hubs failed, will attempt connection again.", :error_message => e.getMessage(), :class => e.class.name, :retry_in_seconds => retry_interval ) # Wait for interval sleep(retry_interval) # Add attempt and retry try += 1 retry end # Pass event to sender on encode @codec.on_event(&method(:send_record)) end
Private Instance Methods
send_record(event, payload)
click to toggle source
# File lib/logstash/outputs/azure_event_hubs.rb, line 152 def send_record(event, payload) begin # Create EventData object and convert payload to bytes eh_event = EventData.create(ByteBuffer::wrap(payload.to_java_bytes)) # Add property bag if (!@properties_bag.nil?) @properties_bag.each do |key, value| eh_event.getProperties().put(event.sprintf(key).to_java_string, event.sprintf(value).to_java_string) end end # Send using client @eventhub_client.sendSync(eh_event) rescue => e @logger.warn("Error sending event", :exception => e, :event => event) end end