class ActiveJob::QueueAdapters::ActiveElasticJobAdapter

Active Elastic Job adapter for Active Job

Active Elastic Job provides (1) an adapter (this class) for Rails' Active Job framework and (2) a Rack middleware to process job requests, which are sent by the SQS daemon running in Amazon Elastic Beanstalk worker environments.

This adapter serializes job objects and sends them as a message to an Amazon SQS queue specified by the job's queue name, see ActiveJob::Base.queue_as

To use Active Elastic Job, set the queue_adapter config to :active_elastic_job.

Rails.application.config.active_job.queue_adapter = :active_elastic_job

Constants

MAX_DELAY_IN_MINUTES
MAX_MESSAGE_SIZE

Private Class Methods

aws_client_verifies_md5_digests?() click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 127
def aws_client_verifies_md5_digests?
  Gem::Version.new(Aws::CORE_GEM_VERSION) >= Gem::Version.new('2.2.19'.freeze)
end
aws_region() click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 186
def aws_region
  config.aws_region
end
aws_sqs_client() click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 174
def aws_sqs_client
  @aws_sqs_client ||= Aws::SQS::Client.new(region: aws_region, credentials: aws_sqs_client_credentials )
end
aws_sqs_client_credentials() click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 178
def aws_sqs_client_credentials
  @aws_credentials ||= if config.aws_credentials.kind_of?(Proc)
                         config.aws_credentials.call
                       else
                         config.aws_credentials
                       end
end
build_message(queue_name, serialized_job, timestamp) click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 131
def build_message(queue_name, serialized_job, timestamp)
  {
    queue_url: queue_url(queue_name),
    message_body: serialized_job,
    delay_seconds: calculate_delay(timestamp),
    message_attributes: {
      "message-digest".freeze => {
        string_value: message_digest(serialized_job),
        data_type: "String".freeze
      },
      origin: {
        string_value: ActiveElasticJob::ACRONYM,
        data_type: "String".freeze
      }
    }
  }
end
calculate_delay(timestamp) click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 159
def calculate_delay(timestamp)
  delay = (timestamp - Time.current.to_f).to_i + 1
  if delay > MAX_DELAY_IN_MINUTES.minutes
    raise DelayTooLong.new
  end
  delay = 0 if delay < 0
  delay
end
check_job_size!(serialized_job) click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 168
def check_job_size!(serialized_job)
  if serialized_job.bytesize > MAX_MESSAGE_SIZE
    raise SerializedJobTooBig, serialized_job
  end
end
config() click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 190
def config
  Rails.application.config.active_elastic_job
end
message_digest(messsage_body) click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 194
def message_digest(messsage_body)
  @verifier ||= ActiveElasticJob::MessageVerifier.new(secret_key_base)
  @verifier.generate_digest(messsage_body)
end
queue_url(queue_name) click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 149
def queue_url(queue_name)
  cache_key = queue_name.to_s
  @queue_urls ||= { }
  return @queue_urls[cache_key] if @queue_urls[cache_key]
  resp = aws_sqs_client.get_queue_url(queue_name: queue_name.to_s)
  @queue_urls[cache_key] = resp.queue_url
rescue Aws::SQS::Errors::NonExistentQueue => e
  raise NonExistentQueue.new(queue_name, aws_region)
end
secret_key_base() click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 215
def secret_key_base
  config.secret_key_base
end
verify_md5_digests!(response, messsage_body, message_attributes) click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 199
def verify_md5_digests!(response, messsage_body, message_attributes)
  calculated = md5_of_message_body(messsage_body)
  returned = response.md5_of_message_body
  if calculated != returned
    raise MD5MismatchError.new response.message_id, calculated, returned
  end

  if message_attributes
    calculated = md5_of_message_attributes(message_attributes)
    returned = response.md5_of_message_attributes
    if  calculated != returned
      raise MD5MismatchError.new response.message_id, calculated, returned
    end
  end
end