class LogStash::Inputs::Amqp

Constants

MQTYPES

Public Class Methods

new(url, type, config={}, &block) click to toggle source
Calls superclass method LogStash::Inputs::Base::new
# File lib/logstash/inputs/amqp.rb, line 12
def initialize(url, type, config={}, &block)
  super

  @mq = nil

  # Handle path /<vhost>/<type>/<name> or /<type>/<name>
  # vhost allowed to contain slashes
  if @url.path =~ %r{^/((.*)/)?([^/]+)/([^/]+)}
    unused, @vhost, @mqtype, @name = $~.captures
  else
    raise "amqp urls must have a path of /<type>/name or /vhost/<type>/name where <type> is #{MQTYPES.join(", ")}"
  end

  if !MQTYPES.include?(@mqtype)
    raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.join(", ")}"
  end
end

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/amqp.rb, line 31
def register
  @logger.info("Registering input #{@url}")
  query_args = @url.query ? CGI.parse(@url.query) : {}
  amqpsettings = {
    :vhost => (@vhost or "/"),
    :host => @url.host,
    :port => (@url.port or 5672),
  }
  amqpsettings[:user] = @url.user if @url.user
  amqpsettings[:pass] = @url.password if @url.password
  amqpsettings[:logging] = query_args.include? "debug"
  queue_name = ((@urlopts["queue"].nil? or @urlopts["queue"].empty?) ? "logstash-#{@name}" : @urlopts["queue"])
  @logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{queue_name} on exchange #{@name.inspect}")
  @amqp = AMQP.connect(amqpsettings)
  @mq = MQ.new(@amqp)
  @target = nil

  @durable_exchange = @urlopts["durable_exchange"] ? true : false
  @durable_queue = @urlopts["durable_queue"] ? true : false
  @target = @mq.queue(queue_name, :durable => @durable_queue)
  case @mqtype
    when "fanout"
      @target.bind(@mq.fanout(@name, :durable => @durable_exchange))
    when "direct"
      @target.bind(@mq.direct(@name, :durable => @durable_exchange))
    when "topic"
      @target.bind(@mq.topic(@name, :durable => @durable_exchange))
  end # case @mqtype

  @target.subscribe(:ack => true) do |header, message|
    event = LogStash::Event.from_json(message)
    receive(event)
    header.ack
  end
end