class ActiveJob::QueueAdapters::ActiveElasticJobAdapter
Active Elastic Job adapter for Active Job¶ ↑
Active Elastic Job provides (1) an adapter (this class) for Rails' Active Job framework and (2) a Rack middleware to process job requests, which are sent by the SQS daemon running in Amazon Elastic Beanstalk worker environments.
This adapter serializes job objects and sends them as a message to an Amazon SQS queue specified by the job's queue name, see ActiveJob::Base.queue_as
To use Active Elastic Job, set the queue_adapter config to :active_elastic_job
.
Rails.application.config.active_job.queue_adapter = :active_elastic_job
Constants
- MAX_DELAY_IN_MINUTES
- MAX_MESSAGE_SIZE
Private Class Methods
aws_client_verifies_md5_digests?()
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 127 def aws_client_verifies_md5_digests? Gem::Version.new(Aws::CORE_GEM_VERSION) >= Gem::Version.new('2.2.19'.freeze) end
aws_region()
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 186 def aws_region config.aws_region end
aws_sqs_client()
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 174 def aws_sqs_client @aws_sqs_client ||= Aws::SQS::Client.new(region: aws_region, credentials: aws_sqs_client_credentials ) end
aws_sqs_client_credentials()
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 178 def aws_sqs_client_credentials @aws_credentials ||= if config.aws_credentials.kind_of?(Proc) config.aws_credentials.call else config.aws_credentials end end
build_message(queue_name, serialized_job, timestamp)
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 131 def build_message(queue_name, serialized_job, timestamp) { queue_url: queue_url(queue_name), message_body: serialized_job, delay_seconds: calculate_delay(timestamp), message_attributes: { "message-digest".freeze => { string_value: message_digest(serialized_job), data_type: "String".freeze }, origin: { string_value: ActiveElasticJob::ACRONYM, data_type: "String".freeze } } } end
calculate_delay(timestamp)
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 159 def calculate_delay(timestamp) delay = (timestamp - Time.current.to_f).to_i + 1 if delay > MAX_DELAY_IN_MINUTES.minutes raise DelayTooLong.new end delay = 0 if delay < 0 delay end
check_job_size!(serialized_job)
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 168 def check_job_size!(serialized_job) if serialized_job.bytesize > MAX_MESSAGE_SIZE raise SerializedJobTooBig, serialized_job end end
config()
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 190 def config Rails.application.config.active_elastic_job end
message_digest(messsage_body)
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 194 def message_digest(messsage_body) @verifier ||= ActiveElasticJob::MessageVerifier.new(secret_key_base) @verifier.generate_digest(messsage_body) end
queue_url(queue_name)
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 149 def queue_url(queue_name) cache_key = queue_name.to_s @queue_urls ||= { } return @queue_urls[cache_key] if @queue_urls[cache_key] resp = aws_sqs_client.get_queue_url(queue_name: queue_name.to_s) @queue_urls[cache_key] = resp.queue_url rescue Aws::SQS::Errors::NonExistentQueue => e raise NonExistentQueue.new(queue_name, aws_region) end
secret_key_base()
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 215 def secret_key_base config.secret_key_base end
verify_md5_digests!(response, messsage_body, message_attributes)
click to toggle source
# File lib/active_job/queue_adapters/active_elastic_job_adapter.rb, line 199 def verify_md5_digests!(response, messsage_body, message_attributes) calculated = md5_of_message_body(messsage_body) returned = response.md5_of_message_body if calculated != returned raise MD5MismatchError.new response.message_id, calculated, returned end if message_attributes calculated = md5_of_message_attributes(message_attributes) returned = response.md5_of_message_attributes if calculated != returned raise MD5MismatchError.new response.message_id, calculated, returned end end end