class ActiveElasticJob::Rack::SqsMessageConsumer
This middleware intercepts requests which are sent by the SQS daemon running in Amazon Elastic Beanstalk worker environments. It does this by looking at the User-Agent
header. Requesets from the SQS daemon are handled in two alternative cases:
(1) the processed SQS message was originally triggered by a periodic task supported by Elastic Beanstalk’s Periodic Task feature
(2) the processed SQS message was queued by this gem representing an active job. In this case it verifies the digest which is sent along with a legit SQS message, and passed as an HTTP header in the resulting request. The digest is based on Rails’ secrets.secret_key_base
. Therefore, the application running in the web environment, which generates the digest, and the application running in the worker environment, which verifies the digest, have to use the same secrets.secret_key_base
setting.
Constants
- DOCKER_HOST_IP
172.17.0.x is the default for Docker 172.18.0.x is the default for the bridge network of Docker Compose
- FORBIDDEN_RESPONSE
- OK_RESPONSE
Private Instance Methods
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 143 def app_runs_in_docker_container? (`[ -f /proc/1/cgroup ] && cat /proc/1/cgroup` =~ /(ecs|docker)/).present? end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 81 def aws_sqsd?(request) # Does not match against a Regexp # in order to avoid performance penalties. # Instead performs a simple string comparison. # Benchmark runs showed an performance increase of # up to 40% current_user_agent = request.headers['User-Agent'.freeze] return (current_user_agent.present? && current_user_agent.size >= 'aws-sqsd'.freeze.size && current_user_agent[0..('aws-sqsd'.freeze.size - 1)] == 'aws-sqsd'.freeze) end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 77 def config Rails.application.config.active_elastic_job end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 61 def enabled? Rails.application.config.active_elastic_job.process_jobs == true end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 113 def execute_job(request) verify!(request) job = JSON.load(request.body) ActiveJob::Base.execute(job) end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 119 def execute_periodic_task(request) job_name = request.headers['X-Aws-Sqsd-Taskname'] job = job_name.constantize.new job.perform_now end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 139 def ip_originates_from_docker?(request) (request.remote_ip =~ DOCKER_HOST_IP).present? or (request.remote_addr =~ DOCKER_HOST_IP).present? end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 125 def originates_from_gem?(request) if request.headers['HTTP_X_AWS_SQSD_ATTR_ORIGIN'.freeze] == ActiveElasticJob::ACRONYM return true elsif request.headers['HTTP_X_AWS_SQSD_ATTR_MESSAGE_DIGEST'.freeze] != nil return true else return false end end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 109 def periodic_task?(request) !request.fullpath.nil? && request.fullpath[0..(periodic_tasks_route.size - 1)] == periodic_tasks_route end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 105 def periodic_tasks_route @periodic_tasks_route ||= config.periodic_tasks_route end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 73 def secret_key_base config.secret_key_base end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 135 def sent_from_docker_host?(request) app_runs_in_docker_container? && ip_originates_from_docker?(request) end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 93 def sqsd?(request) # Does not match against a Regexp # in order to avoid performance penalties. # Instead performs a simple string comparison. # Benchmark runs showed an performance increase of # up to 40% current_user_agent = request.headers['User-Agent'.freeze] return (current_user_agent.present? && current_user_agent.size >= 'sqsd'.freeze.size && current_user_agent[0..('sqsd'.freeze.size - 1)] == 'sqsd'.freeze) end
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 65 def verify!(request) @verifier ||= ActiveElasticJob::MessageVerifier.new(secret_key_base) digest = request.headers['HTTP_X_AWS_SQSD_ATTR_MESSAGE_DIGEST'.freeze] message = request.body_stream.read request.body_stream.rewind @verifier.verify!(message, digest) end