class ActiveElasticJob::Rack::SqsMessageConsumer
This middleware intercepts requests which are sent by the SQS daemon running in Amazon Elastic Beanstalk worker environments. It does this by looking at the User-Agent
header. Requesets from the SQS daemon are handled in two alternative cases:
(1) the processed SQS message was originally triggered by a periodic task supported by Elastic Beanstalk's Periodic Task feature
(2) the processed SQS message was queued by this gem representing an active job. In this case it verifies the digest which is sent along with a legit SQS message, and passed as an HTTP header in the resulting request. The digest is based on Rails' secrets.secret_key_base
. Therefore, the application running in the web environment, which generates the digest, and the application running in the worker environment, which verifies the digest, have to use the same secrets.secret_key_base
setting.
Constants
- DOCKER_HOST_IP
- FORBIDDEN_RESPONSE
- OK_RESPONSE
Private Instance Methods
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 124 def app_runs_in_docker_container? @app_in_docker_container ||= `[ -f /proc/1/cgroup ] && cat /proc/1/cgroup` =~ /docker/ end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 78 def aws_sqsd?(request) # Does not match against a Regexp # in order to avoid performance penalties. # Instead performs a simple string comparison. # Benchmark runs showed an performance increase of # up to 40% current_user_agent = request.headers['User-Agent'.freeze] return (current_user_agent.present? && current_user_agent.size >= 'aws-sqsd'.freeze.size && current_user_agent[0..('aws-sqsd'.freeze.size - 1)] == 'aws-sqsd'.freeze) end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 74 def config Rails.application.config.active_elastic_job end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 58 def enabled? Rails.application.config.active_elastic_job.process_jobs == true end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 98 def execute_job(request) verify!(request) job = JSON.load(request.body) ActiveJob::Base.execute(job) end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 104 def execute_periodic_task(request) job_name = request.headers['X-Aws-Sqsd-Taskname'] job = job_name.constantize.new job.perform_now end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 110 def originates_from_gem?(request) if request.headers['HTTP_X_AWS_SQSD_ATTR_ORIGIN'.freeze] == ActiveElasticJob::ACRONYM return true elsif request.headers['HTTP_X_AWS_SQSD_ATTR_MESSAGE_DIGEST'.freeze] != nil return true else return false end end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 94 def periodic_task?(request) !request.fullpath.nil? && request.fullpath[0..(periodic_tasks_route.size - 1)] == periodic_tasks_route end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 90 def periodic_tasks_route @periodic_tasks_route ||= config.periodic_tasks_route end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 70 def secret_key_base config.secret_key_base end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 120 def sent_from_docker_host?(request) app_runs_in_docker_container? && request.remote_ip == DOCKER_HOST_IP end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 62 def verify!(request) @verifier ||= ActiveElasticJob::MessageVerifier.new(secret_key_base) digest = request.headers['HTTP_X_AWS_SQSD_ATTR_MESSAGE_DIGEST'.freeze] message = request.body_stream.read request.body_stream.rewind @verifier.verify!(message, digest) end