class ActiveElasticJob::Rack::SqsMessageConsumer

This middleware intercepts requests which are sent by the SQS daemon running in Amazon Elastic Beanstalk worker environments. It does this by looking at the User-Agent header. Requesets from the SQS daemon are handled in two alternative cases:

(1) the processed SQS message was originally triggered by a periodic task supported by Elastic Beanstalk's Periodic Task feature

(2) the processed SQS message was queued by this gem representing an active job. In this case it verifies the digest which is sent along with a legit SQS message, and passed as an HTTP header in the resulting request. The digest is based on Rails' secrets.secret_key_base. Therefore, the application running in the web environment, which generates the digest, and the application running in the worker environment, which verifies the digest, have to use the same secrets.secret_key_base setting.

Constants

DOCKER_HOST_IP
FORBIDDEN_RESPONSE
OK_RESPONSE

Private Instance Methods

app_runs_in_docker_container?() click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 124
def app_runs_in_docker_container?
  @app_in_docker_container ||= `[ -f /proc/1/cgroup ] && cat /proc/1/cgroup` =~ /docker/
end
aws_sqsd?(request) click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 78
def aws_sqsd?(request)
  # Does not match against a Regexp
  # in order to avoid performance penalties.
  # Instead performs a simple string comparison.
  # Benchmark runs showed an performance increase of
  # up to 40%
  current_user_agent = request.headers['User-Agent'.freeze]
  return (current_user_agent.present? &&
    current_user_agent.size >= 'aws-sqsd'.freeze.size &&
    current_user_agent[0..('aws-sqsd'.freeze.size - 1)] == 'aws-sqsd'.freeze)
end
config() click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 74
def config
  Rails.application.config.active_elastic_job
end
enabled?() click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 58
def enabled?
  Rails.application.config.active_elastic_job.process_jobs == true
end
execute_job(request) click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 98
def execute_job(request)
  verify!(request)
  job = JSON.load(request.body)
  ActiveJob::Base.execute(job)
end
execute_periodic_task(request) click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 104
def execute_periodic_task(request)
  job_name = request.headers['X-Aws-Sqsd-Taskname']
  job = job_name.constantize.new
  job.perform_now
end
originates_from_gem?(request) click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 110
def originates_from_gem?(request)
  if request.headers['HTTP_X_AWS_SQSD_ATTR_ORIGIN'.freeze] == ActiveElasticJob::ACRONYM
    return true
  elsif request.headers['HTTP_X_AWS_SQSD_ATTR_MESSAGE_DIGEST'.freeze] != nil
    return true
  else
    return false
  end
end
periodic_task?(request) click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 94
def periodic_task?(request)
  !request.fullpath.nil? && request.fullpath[0..(periodic_tasks_route.size - 1)] == periodic_tasks_route
end
periodic_tasks_route() click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 90
def periodic_tasks_route
  @periodic_tasks_route ||= config.periodic_tasks_route
end
secret_key_base() click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 70
def secret_key_base
  config.secret_key_base
end
sent_from_docker_host?(request) click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 120
def sent_from_docker_host?(request)
  app_runs_in_docker_container? && request.remote_ip == DOCKER_HOST_IP
end
verify!(request) click to toggle source
# File lib/active_elastic_job/rack/sqs_message_consumer.rb, line 62
def verify!(request)
  @verifier ||= ActiveElasticJob::MessageVerifier.new(secret_key_base)
  digest = request.headers['HTTP_X_AWS_SQSD_ATTR_MESSAGE_DIGEST'.freeze]
  message = request.body_stream.read
  request.body_stream.rewind
  @verifier.verify!(message, digest)
end