class LogStash::Inputs::RabbitMQ

Pull events from a www.rabbitmq.com/[RabbitMQ] queue.

The default settings will create an entirely transient queue and listen for all messages by default. If you need durability or any other advanced settings, please set the appropriate options

This plugin uses the rubymarchhare.info/[March Hare] library for interacting with the RabbitMQ server. Most configuration options map directly to standard RabbitMQ and AMQP concepts. The www.rabbitmq.com/amqp-0-9-1-reference.html[AMQP 0-9-1 reference guide] and other parts of the RabbitMQ documentation are useful for deeper understanding.

The properties of messages received will be stored in the `[@metadata]` field if the `@metadata_enabled` setting is checked. Note that storing metadata may degrade performance. The following properties may be available (in most cases dependent on whether they were set by the sender):

For example, to get the RabbitMQ message's timestamp property into the Logstash event's `@timestamp` field, use the date filter to parse the `[@metadata][timestamp]` field:

source,ruby

filter {

if [@metadata][rabbitmq_properties][timestamp] {
  date {
    match => ["[@metadata][rabbitmq_properties][timestamp]", "UNIX"]
  }
}

}

Additionally, any message headers will be saved in the `[@metadata]` field.

Constants

INTERNAL_QUEUE_POISON
MESSAGE_PROPERTIES

The properties to extract from each message and store in a @metadata field.

Technically the exchange, redeliver, and routing-key properties belong to the envelope and not the message but we ignore that distinction here. However, we extract the headers separately via get_headers even though the header table technically is a message property.

Freezing all strings so that code modifying the event's @metadata field can't touch them.

If updating this list, remember to update the documentation above too.

Public Instance Methods

bind_exchange!() click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 194
def bind_exchange!
  if @exchange
    if @exchange_type # Only declare the exchange if @exchange_type is set!
      @logger.info? && @logger.info("Declaring exchange '#{@exchange}' with type #{@exchange_type}")
      @hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable)
    end
    @hare_info.queue.bind(@exchange, :routing_key => @key)
  end
end
consume!() click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 217
def consume!
  @consumer = @hare_info.queue.build_consumer(:on_cancellation => Proc.new { on_cancellation }) do |metadata, data|
    @internal_queue.put [metadata, data]
  end

  begin
    @hare_info.queue.subscribe_with(@consumer, :manual_ack => @ack)
  rescue MarchHare::Exception => e
    @logger.warn("Could not subscribe to queue! Will retry in #{@subscription_retry_interval_seconds} seconds", :queue => @queue)

    sleep @subscription_retry_interval_seconds
    retry
  end

  internal_queue_consume!
end
declare_queue() click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 208
def declare_queue
  @hare_info.channel.queue(@queue,
                           :durable     => @durable,
                           :auto_delete => @auto_delete,
                           :exclusive   => @exclusive,
                           :passive     => @passive,
                           :arguments   => @arguments)
end
declare_queue!() click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 204
def declare_queue!
  @hare_info.queue = declare_queue()
end
internal_queue_consume!() click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 234
def internal_queue_consume!
  i=0
  last_delivery_tag=nil
  while true
    payload = @internal_queue.poll(10, TimeUnit::MILLISECONDS)
    if !payload  # Nothing in the queue
      if last_delivery_tag # And we have unacked stuff
        @hare_info.channel.ack(last_delivery_tag, true) if @ack
        i=0
        last_delivery_tag = nil
      end
      next
    end

    break if payload == INTERNAL_QUEUE_POISON

    metadata, data = payload
    @codec.decode(data) do |event|
      decorate(event)
      if @metadata_enabled
        event.set("[@metadata][rabbitmq_headers]", get_headers(metadata))
        event.set("[@metadata][rabbitmq_properties]", get_properties(metadata))
      end
      @output_queue << event if event
    end

    i += 1

    if i >= @prefetch_count
      @hare_info.channel.ack(metadata.delivery_tag, true) if @ack
      i = 0
      last_delivery_tag = nil
    else
      last_delivery_tag = metadata.delivery_tag
    end
  end
end
on_cancellation() click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 287
def on_cancellation
  if !stop? # If this isn't already part of a regular stop
    @logger.info("Received basic.cancel from #{rabbitmq_settings[:host]}, shutting down.")
    stop
  end
end
register() click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 170
def register
  @internal_queue = java.util.concurrent.ArrayBlockingQueue.new(@prefetch_count*2)
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 174
def run(output_queue)
  setup!
  @output_queue = output_queue
  consume!
end
setup!() click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 180
def setup!
  connect!
  declare_queue!
  bind_exchange!
  @hare_info.channel.prefetch = @prefetch_count
rescue => e
  @logger.warn("Error while setting up connection for rabbitmq input! Will retry.",
               :message => e.message,
               :class => e.class.name,
               :location => e.backtrace.first)
  sleep_for_retry
  retry
end
shutdown_consumer() click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 278
def shutdown_consumer
  return unless @consumer
  @hare_info.channel.basic_cancel(@consumer.consumer_tag)
  until @consumer.terminated?
    @logger.info("Waiting for rabbitmq consumer to terminate before stopping!", :params => self.params)
    sleep 1
  end
end
stop() click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 272
def stop
  @internal_queue.put(INTERNAL_QUEUE_POISON)
  shutdown_consumer
  close_connection
end

Private Instance Methods

get_headers(metadata) click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 295
def get_headers(metadata)
  metadata.headers || {}
end
get_properties(metadata) click to toggle source
# File lib/logstash/inputs/rabbitmq.rb, line 300
def get_properties(metadata)
  MESSAGE_PROPERTIES.reduce({}) do |acc, name|
    # The method names obviously can't contain hyphens.
    value = metadata.send(name.gsub("-", "_"))
    if value
      # The AMQP 0.9.1 timestamp field only has second resolution
      # so storing milliseconds serves no purpose and might give
      # the incorrect impression of a higher resolution.
      acc[name] = name != "timestamp" ? value : value.getTime / 1000
    end
    acc
  end
end