class LogStash::Inputs::Stomp

Creates events received with the STOMP protocol.

Attributes

client[RW]

Public Instance Methods

new_client() click to toggle source
# File lib/logstash/inputs/stomp.rb, line 70
def new_client
  OnStomp::Client.new("stomp://#{@host}:#{@port}", :login => @user, :passcode => @password.value)
end
register() click to toggle source
# File lib/logstash/inputs/stomp.rb, line 62
def register
  require "onstomp"
      
  @client = new_client
  @client.host = @vhost if @vhost
  @stomp_url = "stomp://#{@user}:#{@password}@#{@host}:#{@port}/#{@destination}"
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/stomp.rb, line 97
def run(output_queue)
  # Handle disconnects
  @client.on_connection_closed {
    self.connect
    subscription_handler # is required for re-subscribing to the destination
  }

  connect
  @output_queue = output_queue
  subscription_handler
end
stop() click to toggle source
# File lib/logstash/inputs/stomp.rb, line 109
def stop
  @client.disconnect if @client && @client.connected?
end

Private Instance Methods

connect() click to toggle source
# File lib/logstash/inputs/stomp.rb, line 44
def connect
  begin
    @client.connect
    @logger.info("Connected to stomp server") if @client.connected?
  rescue OnStomp::ConnectFailedError, OnStomp::UnsupportedProtocolVersionError, Errno::ECONNREFUSED => e      
    if @reconnect && !stop?
      @logger.warn("Failed to connect to stomp server. Retry in #{@reconnect_interval} seconds. #{e.inspect}")
      @logger.debug("#{e.backtrace.join("\n")}") if @debug
      sleep @reconnect_interval
      retry
    end

    @logger.warn("Failed to connect to stomp server. Exiting with error: #{e.inspect}")
    @logger.debug("#{e.backtrace.join("\n")}") if @debug
  end
end
subscription_handler() click to toggle source
# File lib/logstash/inputs/stomp.rb, line 75
def subscription_handler
  #Exit function when connection is not active
  return if !@client.connected?

  @client.subscribe(@destination) do |msg|
    @codec.decode(msg.body) do |event|
      decorate(event)
      @output_queue << event
    end
  end
  #In the event that there is only Stomp input plugin instances
  #the process ends prematurely. The above code runs, and return
  #the flow control to the 'run' method below. After that, the
  #method "run_input" from agent.rb marks 'done' as 'true' and calls
  #'finish' over the Stomp plugin instance.
  #'Sleeping' the plugin leaves the instance alive.
  until stop?
    sleep 1
  end
end