class LogStash::Outputs::SQS

Push events to an Amazon Web Services (AWS) Simple Queue Service (SQS) queue.

SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools. Although SQS is similar to other queuing systems such as Advanced Message Queuing Protocol (AMQP), it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.

The “consumer” identity must have the following permissions on the queue:

* `sqs:GetQueueUrl`
* `sqs:SendMessage`
* `sqs:SendMessageBatch`

Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. See aws.amazon.com/iam/ for more details on setting up AWS identities. A sample policy is as follows:

source,json

{

"Version": "2012-10-17",
"Statement": [
  {
    "Effect": "Allow",
    "Action": [
      "sqs:GetQueueUrl",
      "sqs:SendMessage",
      "sqs:SendMessageBatch"
    ],
    "Resource": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue"
  }
]

}

Batch Publishing

This output publishes messages to SQS in batches in order to optimize event throughput and increase performance. This is done using the [`SendMessageBatch`](docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html) API. When publishing messages to SQS in batches, the following service limits must be respected (see [Limits in Amazon SQS](docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-messages.html)):

* The maximum allowed individual message size is 256KiB.
* The maximum total payload size (i.e. the sum of the sizes of all
  individual messages within a batch) is also 256KiB.

This plugin will dynamically adjust the size of the batch published to SQS in order to ensure that the total payload size does not exceed 256KiB.

WARNING: This output cannot currently handle messages larger than 256KiB. Any single message exceeding this size will be dropped.

Public Instance Methods

multi_receive_encoded(encoded_events) click to toggle source
# File lib/logstash/outputs/sqs.rb, line 113
def multi_receive_encoded(encoded_events)
  if @batch_events > 1
    multi_receive_encoded_batch(encoded_events)
  else
    multi_receive_encoded_single(encoded_events)
  end
end
register() click to toggle source
# File lib/logstash/outputs/sqs.rb, line 90
def register
  @sqs = Aws::SQS::Client.new(aws_options_hash)

  if @batch_events > 10
    raise LogStash::ConfigurationError, 'The maximum batch size is 10 events'
  elsif @batch_events < 1
    raise LogStash::ConfigurationError, 'The batch size must be greater than 0'
 end

  begin
    params = { queue_name: @queue }
    params[:queue_owner_aws_account_id] = @queue_owner_aws_account_id if @queue_owner_aws_account_id

    @logger.debug('Connecting to SQS queue', params.merge(region: region))
    @queue_url = @sqs.get_queue_url(params)[:queue_url]
    @logger.info('Connected to SQS queue successfully', params.merge(region: region))
  rescue Aws::SQS::Errors::ServiceError => e
    @logger.error('Failed to connect to SQS', :error => e)
    raise LogStash::ConfigurationError, 'Verify the SQS queue name and your credentials'
  end
end

Private Instance Methods

multi_receive_encoded_batch(encoded_events) click to toggle source
# File lib/logstash/outputs/sqs.rb, line 122
def multi_receive_encoded_batch(encoded_events)
  bytes = 0
  entries = []

  # Split the events into multiple batches to ensure that no single batch
  # exceeds `@message_max_size` bytes.
  encoded_events.each_with_index do |encoded_event, index|
    event, encoded = encoded_event

    if encoded.bytesize > @message_max_size
      @logger.warn('Message exceeds maximum length and will be dropped', :message_size => encoded.bytesize)
      next
    end

    if entries.size >= @batch_events or (bytes + encoded.bytesize) > @message_max_size
      send_message_batch(entries)

      bytes = 0
      entries = []
    end

    bytes += encoded.bytesize
    entries.push(:id => index.to_s, :message_body => encoded)
  end

  send_message_batch(entries) unless entries.empty?
end
multi_receive_encoded_single(encoded_events) click to toggle source
# File lib/logstash/outputs/sqs.rb, line 151
def multi_receive_encoded_single(encoded_events)
  encoded_events.each do |encoded_event|
    event, encoded = encoded_event

    if encoded.bytesize > @message_max_size
      @logger.warn('Message exceeds maximum length and will be dropped', :message_size => encoded.bytesize)
      next
    end

    @sqs.send_message(:queue_url => @queue_url, :message_body => encoded)
  end
end
send_message_batch(entries) click to toggle source
# File lib/logstash/outputs/sqs.rb, line 165
def send_message_batch(entries)
  @logger.debug("Publishing #{entries.size} messages to SQS", :queue_url => @queue_url, :entries => entries)
  @sqs.send_message_batch(:queue_url => @queue_url, :entries => entries)
end