class LogStash::Inputs::Azuretopic

Reads events from Azure topics

Public Class Methods

new(*args) click to toggle source
Calls superclass method
# File lib/logstash/inputs/azuretopic.rb, line 23
def initialize(*args)
super(*args)
end

Public Instance Methods

process(output_queue) click to toggle source
# File lib/logstash/inputs/azuretopic.rb, line 46
def process(output_queue)
  message = @azure_service_bus.receive_subscription_message(@topic ,@subscription, { :peek_lock => true, :timeout => 1 } )
  if message
    codec.decode(message.body) do |event|
      decorate(event)
      output_queue << event
    end # codec.decode
    @azure_service_bus.delete_subscription_message(message)
  end
  rescue LogStash::ShutdownSignal => e
    raise e
  rescue => e
    @logger.error("Oh My, An error occurred.", :exception => e)
  if message and message.delivery_count > @deliverycount
    @azure_service_bus.delete_subscription_message(message)
  end
end
register() click to toggle source
# File lib/logstash/inputs/azuretopic.rb, line 28
def register
  Azure.configure do |config|
    config.sb_namespace = @namespace
    config.sb_access_key = @access_key
    config.sb_sas_key_name = @access_key_name
    config.sb_sas_key = @access_key
  end
  if access_key_name 
      # SAS key used
      signer = Azure::ServiceBus::Auth::SharedAccessSigner.new
      sb_host = "https://#{Azure.sb_namespace}.servicebus.windows.net"
      @azure_service_bus = Azure::ServiceBus::ServiceBusService.new(sb_host, { signer: signer})
  else
      # ACS key
      @azure_service_bus = Azure::ServiceBus::ServiceBusService.new
  end
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/azuretopic.rb, line 65
def run(output_queue)
  while !stop?
    process(output_queue)
  end # loop
end
teardown() click to toggle source
# File lib/logstash/inputs/azuretopic.rb, line 72
def teardown
end