class LogStash::Outputs::SQS
Push events to an Amazon Web Services (AWS) Simple Queue Service (SQS
) queue.
SQS
is a simple, scalable queue system that is part of the Amazon Web Services suite of tools. Although SQS
is similar to other queuing systems such as Advanced Message Queuing Protocol (AMQP), it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS
works, what the pricing schedule looks like and how to setup a queue.
The “consumer” identity must have the following permissions on the queue:
* `sqs:GetQueueUrl` * `sqs:SendMessage` * `sqs:SendMessageBatch`
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. See aws.amazon.com/iam/ for more details on setting up AWS identities. A sample policy is as follows:
- source,json
-
{
"Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sqs:GetQueueUrl", "sqs:SendMessage", "sqs:SendMessageBatch" ], "Resource": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue" } ]
}
Batch Publishing¶ ↑
This output publishes messages to
SQS
in batches in order to optimize event throughput and increase performance. This is done using the [`SendMessageBatch`](docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html) API. When publishing messages toSQS
in batches, the following service limits must be respected (see [Limits in Amazon SQS](docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-messages.html)):* The maximum allowed individual message size is 256KiB. * The maximum total payload size (i.e. the sum of the sizes of all individual messages within a batch) is also 256KiB.
This plugin will dynamically adjust the size of the batch published to
SQS
in order to ensure that the total payload size does not exceed 256KiB.WARNING: This output cannot currently handle messages larger than 256KiB. Any single message exceeding this size will be dropped.
Public Instance Methods
# File lib/logstash/outputs/sqs.rb, line 113 def multi_receive_encoded(encoded_events) if @batch_events > 1 multi_receive_encoded_batch(encoded_events) else multi_receive_encoded_single(encoded_events) end end
# File lib/logstash/outputs/sqs.rb, line 90 def register @sqs = Aws::SQS::Client.new(aws_options_hash) if @batch_events > 10 raise LogStash::ConfigurationError, 'The maximum batch size is 10 events' elsif @batch_events < 1 raise LogStash::ConfigurationError, 'The batch size must be greater than 0' end begin params = { queue_name: @queue } params[:queue_owner_aws_account_id] = @queue_owner_aws_account_id if @queue_owner_aws_account_id @logger.debug('Connecting to SQS queue', params.merge(region: region)) @queue_url = @sqs.get_queue_url(params)[:queue_url] @logger.info('Connected to SQS queue successfully', params.merge(region: region)) rescue Aws::SQS::Errors::ServiceError => e @logger.error('Failed to connect to SQS', :error => e) raise LogStash::ConfigurationError, 'Verify the SQS queue name and your credentials' end end
Private Instance Methods
# File lib/logstash/outputs/sqs.rb, line 122 def multi_receive_encoded_batch(encoded_events) bytes = 0 entries = [] # Split the events into multiple batches to ensure that no single batch # exceeds `@message_max_size` bytes. encoded_events.each_with_index do |encoded_event, index| event, encoded = encoded_event if encoded.bytesize > @message_max_size @logger.warn('Message exceeds maximum length and will be dropped', :message_size => encoded.bytesize) next end if entries.size >= @batch_events or (bytes + encoded.bytesize) > @message_max_size send_message_batch(entries) bytes = 0 entries = [] end bytes += encoded.bytesize entries.push(:id => index.to_s, :message_body => encoded) end send_message_batch(entries) unless entries.empty? end
# File lib/logstash/outputs/sqs.rb, line 151 def multi_receive_encoded_single(encoded_events) encoded_events.each do |encoded_event| event, encoded = encoded_event if encoded.bytesize > @message_max_size @logger.warn('Message exceeds maximum length and will be dropped', :message_size => encoded.bytesize) next end @sqs.send_message(:queue_url => @queue_url, :message_body => encoded) end end
# File lib/logstash/outputs/sqs.rb, line 165 def send_message_batch(entries) @logger.debug("Publishing #{entries.size} messages to SQS", :queue_url => @queue_url, :entries => entries) @sqs.send_message_batch(:queue_url => @queue_url, :entries => entries) end