class LogStash::Inputs::Azuretopicthreadable

Reads events from Azure topics

Public Class Methods

new(*args) click to toggle source
Calls superclass method
# File lib/logstash/inputs/azuretopicthreadable.rb, line 25
def initialize(*args)
super(*args)
end

Public Instance Methods

process(output_queue, pid) click to toggle source
# File lib/logstash/inputs/azuretopicthreadable.rb, line 40
def process(output_queue, pid)
  # Get a new instance of a service
  if @access_key_name
      # SAS key used
      signer = Azure::ServiceBus::Auth::SharedAccessSigner.new
      sb_host = "https://#{Azure.sb_namespace}.servicebus.windows.net"
      azure_service_bus = Azure::ServiceBus::ServiceBusService.new(sb_host, { signer: signer})
  else
      # ACS key
      azure_service_bus = Azure::ServiceBus::ServiceBusService.new
  end
  while !stop?
    begin
      # check if we have a message in the subscription
      message = azure_service_bus.receive_subscription_message(@topic ,@subscription, { :peek_lock => true, :timeout => 1 } )
      if message
        # decoding returns a yield
        codec.decode(message.body) do |event|
            decorate(event)
            output_queue << event
        end
        # delete the message after reading it
        azure_service_bus.delete_subscription_message(message)
      else
        Stud.stoppable_sleep(@thread_sleep_time) { stop? } #topic is probably empty. sleep.
      end
    rescue LogStash::ShutdownSignal => e
      raise e
    rescue => e
      @logger.error("Oh My, An error occurred. Thread id:" + pid.to_s, :exception => e)
      if message and message.delivery_count > @deliverycount
        azure_service_bus.delete_subscription_message(message)
      end
      Stud.stoppable_sleep(@thread_sleep_time) { stop? }
    end
  end
end
register() click to toggle source
# File lib/logstash/inputs/azuretopicthreadable.rb, line 30
def register
  # Configure credentials
  Azure.configure do |config|
    config.sb_namespace = @namespace
    config.sb_access_key = @access_key
    config.sb_sas_key_name = @access_key_name
    config.sb_sas_key = @access_key
  end
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/azuretopicthreadable.rb, line 79
def run(output_queue)
  threads = []
    (0..(@threads-1)).each do |pid|
      threads << Thread.new { process(output_queue, pid) }
    end
  threads.each { |thr| thr.join }
end
teardown() click to toggle source
# File lib/logstash/inputs/azuretopicthreadable.rb, line 88
def teardown
end