class LogStash::Outputs::AzureEventHubs

Output plugin to send events to an Azure Event Hub.

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/azure_event_hubs.rb, line 137
def close
  @eventhub_client.closeSync();
  @executor_service.shutdown();
end
receive(event) click to toggle source
# File lib/logstash/outputs/azure_event_hubs.rb, line 143
def receive(event)
  begin
    @codec.encode(event)
  rescue => e
    @logger.warn("Error encoding event", :exception => e, :event => event)
  end
end
register() click to toggle source
# File lib/logstash/outputs/azure_event_hubs.rb, line 78
def register
  # Build Event Hubs client connection string
  connection_builder = ConnectionStringBuilder.new()
  connection_builder.setNamespaceName(@service_namespace)
  connection_builder.setEventHubName(@event_hub)
  connection_builder.setSasKeyName(@sas_key_name)
  connection_builder.setSasKey(@sas_key)

  # Configure for national cloud
  if (!@service_domain.nil?)
    connection_builder.setEndpoint(@service_namespace, @service_domain)
  end

  # The Executor handles all the asynchronous tasks and this is passed to the EventHubClient.
  # The gives the user control to segregate their thread pool based on the work load.
  # This pool can then be shared across multiple EventHubClient instances.
  @executor_service = Executors.newScheduledThreadPool(@client_threads)

  # Handle Transient errors when creating the Event Hubs Client
  try = 0
  retry_interval = 2
  begin
    # Each EventHubClient instance spins up a new TCP/SSL connection, which is expensive.
    # It is always a best practice to reuse these instances.
    @eventhub_client = EventHubClient.createSync(connection_builder.toString(), @executor_service)

  rescue EventHubException, ExecutionException, InterruptedException, IOException => e
    # Log error, no retry
    if e.getIsTransient() != true or e == IOException or try >= @connection_retry_count
      @logger.error(
        "Unable to establish connection to Azure Event Hubs.",
        :error_message => e.getMessage(),
        :class => e.class.name
        )
      close()
      exit(1)
    end

    # Log error with retry
    @logger.error(
      "Connection to Event Hubs failed, will attempt connection again.",
      :error_message => e.getMessage(),
      :class => e.class.name,
      :retry_in_seconds => retry_interval
    )

    # Wait for interval
    sleep(retry_interval)
    
    # Add attempt and retry
    try += 1
    retry
  end

  # Pass event to sender on encode
  @codec.on_event(&method(:send_record))
end

Private Instance Methods

send_record(event, payload) click to toggle source
# File lib/logstash/outputs/azure_event_hubs.rb, line 152
def send_record(event, payload)
  begin
    # Create EventData object and convert payload to bytes
    eh_event = EventData.create(ByteBuffer::wrap(payload.to_java_bytes))
    
    # Add property bag
    if (!@properties_bag.nil?)
      @properties_bag.each do |key, value|
        eh_event.getProperties().put(event.sprintf(key).to_java_string, event.sprintf(value).to_java_string)
      end
    end

    # Send using client
    @eventhub_client.sendSync(eh_event)
  rescue => e
    @logger.warn("Error sending event", :exception => e, :event => event)
  end
end