class MU::Cloud::AWS::MsgQueue

A MsgQueue as configured in {MU::Config::BasketofKittens::msg_queues}

Public Class Methods

cleanup(noop: false, deploy_id: MU.deploy_id, ignoremaster: false, region: MU.curRegion, credentials: nil, flags: {}) click to toggle source

Remove all msg_queues associated with the currently loaded deployment. @param noop [Boolean]: If true, will only print what would be done @param ignoremaster [Boolean]: If true, will remove resources not flagged as originating from this Mu server @param region [String]: The cloud provider region @return [void]

# File modules/mu/providers/aws/msg_queue.rb, line 137
def self.cleanup(noop: false, deploy_id: MU.deploy_id, ignoremaster: false, region: MU.curRegion, credentials: nil, flags: {})
  MU.log "AWS::MsgQueue.cleanup: need to support flags['known']", MU::DEBUG, details: flags
  MU.log "Placeholder: AWS MsgQueue artifacts do not support tags, so ignoremaster cleanup flag has no effect", MU::DEBUG, details: ignoremaster

  resp = MU::Cloud::AWS.sqs(credentials: credentials, region: region).list_queues(
    queue_name_prefix: deploy_id
  )
  if resp and resp.queue_urls
    threads = []
    resp.queue_urls.each { |url|
      threads << Thread.new {
        MU.log "Deleting SQS queue #{url}"
        if !noop
          MU::Cloud::AWS.sqs(credentials: credentials, region: region).delete_queue(
            queue_url: url
          )
          sleep 60 # per API docs, this is how long it takes to really delete
        end
      }
    }
    threads.each { |t|
      t.join
    }
  end
end
find(**args) click to toggle source

Locate an existing msg_queue. @return [Hash]: AWS doesn't return anything but the SQS URL, so supplement with attributes

# File modules/mu/providers/aws/msg_queue.rb, line 165
def self.find(**args)
  args[:flags] ||= {}
  args[:flags]['account'] ||= MU.account_number
  found = {}

  # If it's a URL, make sure it's good
  begin
    if args[:cloud_id]
      if args[:cloud_id].match(/^https?:/i)
        resp = MU::Cloud::AWS.sqs(region: args[:region], credentials: args[:credentials]).get_queue_attributes(
          queue_url: args[:cloud_id],
          attribute_names: ["All"]
        )
        if resp and resp.attributes
          desc = resp.attributes.dup
          desc["Url"] = args[:cloud_id]
          found[args[:cloud_id]] = desc
          return found
        end
      else
        # If it's a plain queue name, resolve it to a URL
        resp = MU::Cloud::AWS.sqs(region: args[:region], credentials: args[:credentials]).get_queue_url(
          queue_name: args[:cloud_id],
          queue_owner_aws_account_id: args[:flags]['account']
        )
        args[:cloud_id] = resp.queue_url if resp and resp.queue_url
      end
    end
  rescue ::Aws::SQS::Errors::NonExistentQueue
  end

  # Go fetch its attributes
  fetch = if args[:cloud_id]
    if args[:cloud_id] !~ /^https?:\/\//
      [begin
        MU::Cloud::AWS.sqs(region: args[:region], credentials: args[:credentials]).get_queue_url(queue_name: args[:cloud_id]).queue_url
      rescue Aws::SQS::Errors::NonExistentQueue
        return found
      end]
    else
      [args[:cloud_id]]
    end
  else
    resp = MU::Cloud::AWS.sqs(region: args[:region], credentials: args[:credentials]).list_queues
    resp.queue_urls
  end

  if fetch
    fetch.each { |url|
      resp = MU::Cloud::AWS.sqs(region: args[:region], credentials: args[:credentials]).get_queue_attributes(
        queue_url: url,
        attribute_names: ["All"]
      )
      if resp and resp.attributes
        desc = resp.attributes.dup
        desc["Url"] = url
        found[url] = desc
      end
    }
  end

  found
end
isGlobal?() click to toggle source

Does this resource type exist as a global (cloud-wide) artifact, or is it localized to a region/zone? @return [Boolean]

# File modules/mu/providers/aws/msg_queue.rb, line 122
def self.isGlobal?
  false
end
new(**args) click to toggle source

Initialize this cloud resource object. Calling super will invoke the initializer defined under {MU::Cloud}, which should set the attribtues listed in {MU::Cloud::PUBLIC_ATTRS} as well as applicable dependency shortcuts, like +@vpc+, for us. @param args [Hash]: Hash of named arguments passed via Ruby's double-splat

Calls superclass method
# File modules/mu/providers/aws/msg_queue.rb, line 23
def initialize(**args)
  super
  @mu_name ||= @deploy.getResourceName(@config["name"])
end
quality() click to toggle source

Denote whether this resource implementation is experiment, ready for testing, or ready for production use.

# File modules/mu/providers/aws/msg_queue.rb, line 128
def self.quality
  MU::Cloud::RELEASE
end
schema(_config) click to toggle source

Cloud-specific configuration properties. @param _config [MU::Config]: The calling MU::Config object @return [Array<Array,Hash>]: List of required fields, and json-schema Hash of cloud-specific configuration parameters for this resource

# File modules/mu/providers/aws/msg_queue.rb, line 232
        def self.schema(_config)
          toplevel_required = []
          schema = {
            "max_msg_size" => {
              "type" => "integer",
              "description" => "Maximum size of messages in this queue, in kB. Must be between 1 and 256.", 
              "default" => 256
            },
            "retain" => {
              "type" => "string",
              "description" => "The length of time for which Amazon SQS retains a message. Assumed to be in seconds, unless you specify a string like '4d' or 'five hours'. Must be between 1 minute and 14 days.",
              "default" => "4 days"
            },
            "delay" => {
              "type" => "string",
              "description" => "Delay delivery by up to 15 minutes. You can specify a string like '1m' or '600 seconds'.", 
              "default" => "0 seconds"
            },
            "receive_timeout" => {
              "type" => "string",
              "description" => "The length of time, for which a ReceiveMessage action waits for a message to arrive, between 0 and 20 seconds. You can specify a string like '5s' or '20 seconds'.", 
              "default" => "0 seconds"
            },
            "visibility_timeout" => {
              "type" => "string",
              "description" => "The length of time during which Amazon SQS prevents other consumers from receiving and processing a message after another consumer has received it. Must be between 0 seconds and 12 hours. You can specify a string like '5 minutes' or '3 hours'. See also: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html",

              "default" => "30 seconds"
            },
            "fifo" => {
              "type" => "boolean",
              "description" => "Designate this queue as a FIFO queue. Messages in this queue must explicitly specify MessageGroupId. This cannot be changed once instantiated. This feature is not available in all regions. See also: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html#FIFO-queues-understanding-logic",
              "default" => false
            },
            "dedup" => {
              "type" => "boolean",
              "description" => "Enables content-based deduplication. When ContentBasedDeduplication is in effect, messages with identical content sent within the deduplication interval are treated as duplicates and only one copy of the message is delivered. This feature is not available in all regions. See also: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html#FIFO-queues-exactly-once-processing",
              "default" => false
            },
            "failqueue" => {
              "type" => "object",
              "description" => "Target queue for messages that can't be processed (consumed) successfully. See also: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html",
              "properties" => {
                "create" => {
                  "type" => "boolean",
                  "description" => "Create a separate MsgQueue on the fly."
                },
                "retries_before_fail" => {
                  "type" => "integer",
                  "description" => "Number of times a message should fail before being sent to this queue. Must be between 1 and 1000.",
                  "default" => 10
                },
                "name" => {
                  "type" => "string",
                  "description" => "The name of a sibling SQS resource in this deploy, or the cloud identifier or URL of a pre-existing one"
                }
              }
            },
# TODO this doesn't work as either an ARN, short identifier, or full JSON policy descriptor. Docs are vague. Need to ask AWS.
#            "iam_policy" => {
#              "type" => "string",
#              "description" => "An IAM policy document for access to this SQS queue. Our parser expects this to be defined inline like the rest of your YAML/JSON Basket of Kittens, not as raw JSON. For guidance on SQS IAM capabilities, see: https://docs.aws.amazon.com/IAM/latest/UserGuide/list_amazonsqs.html"
#            },
            "kms" => {
              "type" => "object",
              "description" => "Use an Amazon KMS key to encrypt and decrypt messages in the background. This feature is not available in all regions. https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-server-side-encryption.html#sqs-sse-key-terms",
              "required" => ["key_id", "key_reuse_period"],
              "properties" => {
                "key_id" => {
                  "type" => "string",
                  "description" => "KMS key to use for encryption and decryption"
                },
                "key_reuse_period" => {
                  "type" => "string",
                  "description" => "The length of time, in seconds, for which Amazon SQS can reuse a data key to encrypt or decrypt messages before calling AWS KMS again. You can specify a string like '5m' or '2 hours'.",
                  "default" => "5 minutes"
                }
              }
            }
          }
          [toplevel_required, schema]
        end
validateConfig(queue, configurator) click to toggle source

Cloud-specific pre-processing of {MU::Config::BasketofKittens::msg_queues}, bare and unvalidated. @param queue [Hash]: The resource to process and validate @param configurator [MU::Config]: The overall deployment configurator of which this resource is a member @return [Boolean]: True if validation succeeded, False otherwise

# File modules/mu/providers/aws/msg_queue.rb, line 319
def self.validateConfig(queue, configurator)
  ok = true

  if queue['failqueue']
    if (!queue['failqueue']['create'] and !queue['failqueue'].has_key?("name")) or
       (queue['failqueue']['create'] and queue['failqueue']['name'])
      MU.log "Must set exactly one of 'create' or 'failqueue' in MsgQueue #{queue['name']}.", MU::ERR
      ok = false
    end
    if queue['failqueue']['retries_before_fail'] < 1 or 
       queue['failqueue']['retries_before_fail'] > 1000
      MU.log "'retries_before_fail' must be between 1 and 1000 in MsgQueue #{queue['name']}.", MU::ERR
      ok = false
    end
    if queue['failqueue']['create']
      failq = queue.dup
      failq['name'] += "-fail"
      failq.delete("failqueue")
      ok = false if !configurator.insertKitten(failq, "msg_queues")
      queue['failqueue']['name'] = failq['name']
      MU::Config.addDependency(queue, failq["name"], "msg_queue")
    else
      if configurator.haveLitterMate?(queue['failqueue']['name'], "msg_queue")
        MU::Config.addDependency(queue, queue['failqueue']['name'], "msg_queue")
      else
        failq = MU::Cloud::AWS::MsgQueue.find(cloud_id: queue['failqueue']['name'])
        if !failq
          MU.log "Could not find an SQS queue named #{queue['failqueue']['name']} for failqueue in MsgQueue '#{queue['name']}'", MU::ERR
          ok = false
        end
      end
    end
  end

  if queue['max_msg_size'] < 1 or queue['max_msg_size'] > 256
    MU.log "Must specify a 'max_msg_size' value between 1 and 256 in MsgQueue #{queue['name']}.", MU::ERR
    ok = false
  end
  queue['max_msg_size'] *= 1024 # the API takes it in bytes

  queue['retain'] = ChronicDuration.parse(queue['retain'], :keep_zero => true)
  if !queue['retain'] or queue['retain'] < 60 or queue['retain'] > 1209600
    MU.log "Must specify a 'retain' value between 1 minute and 14 days in MsgQueue #{queue['name']}.", MU::ERR
    ok = false
  end

  queue['delay'] = ChronicDuration.parse(queue['delay'], :keep_zero => true)
  if !queue['delay'] or queue['delay'] < 0 or queue['delay'] > 900
    MU.log "'delay' value must be between 0 seconds and 15 minutes in MsgQueue #{queue['name']}.", MU::ERR
    ok = false
  end

  queue['receive_timeout'] = ChronicDuration.parse(queue['receive_timeout'], :keep_zero => true)
  if !queue['receive_timeout'] or queue['receive_timeout'] < 0 or queue['receive_timeout'] > 20
    MU.log "'receive_timeout' value must be between 0 seconds and 20 seconds in MsgQueue #{queue['name']}.", MU::ERR
    ok = false
  end

  queue['visibility_timeout'] = ChronicDuration.parse(queue['visibility_timeout'], :keep_zero => true)
  if !queue['visibility_timeout'] or queue['visibility_timeout'] < 0 or queue['visibility_timeout'] > 43200
    MU.log "'visibility_timeout' value must be between 0 seconds and 12 hours in MsgQueue #{queue['name']}.", MU::ERR
    ok = false
  end

  if queue['kms']
    good_regions = ["us-east-1", "us-east-2", "us-west-2"]
    if !good_regions.include?(queue['region'])
      MU.log "KMS SQS encryption isn't supported in all regions, and #{queue['region']} wasn't on the list last we checked. Queue '#{queue['name']}' may not work.", MU::WARN, details: good_regions
    end
    queue['kms']['key_reuse_period'] = ChronicDuration.parse(queue['kms']['key_reuse_period'], :keep_zero => true)
    if !queue['kms']['key_reuse_period'] or queue['kms']['key_reuse_period'] < 60 or queue['kms']['key_reuse_period'] > 86400
      MU.log "KMS 'visibility_period' value must be between 60 seconds and 24 hours in MsgQueue #{queue['name']}.", MU::ERR
      ok = false
    end
    begin
      MU::Cloud::AWS.kms(region: queue['region']).describe_key(key_id: queue['kms']['key_id'])
    rescue Aws::KMS::Errors::NotFoundException
      MU.log "KMS key '#{queue['kms']['key_id']}' specified in Queue '#{queue['name']}' was not found.", MU::ERR, details: "Key IDs are of the form bf64a093-2c3d-46fa-0d4f-8232fa7ed53. Keys can be created at https://console.aws.amazon.com/iam/home#/encryptionKeys/#{queue['region']}"
      ok = false
    end

  end

  good_regions = ["us-east-1", "us-east-2", "us-west-2", "eu-west-1"]

  if (queue['fifo'] or queue['dedup']) and !good_regions.include?(queue['region'])
    MU.log "Fifo queues aren't supported in all regions, and #{queue['region']} wasn't on the list last we checked. MsgQueue '#{queue['name']}' may not work.", MU::WARN, details: good_regions
  end

  # TODO have IAM API validate queue['iam_policy'] if any is set

  ok
end

Public Instance Methods

arn() click to toggle source

Canonical Amazon Resource Number for this resource @return [String]

# File modules/mu/providers/aws/msg_queue.rb, line 73
def arn
  "arn:"+(MU::Cloud::AWS.isGovCloud?(@region) ? "aws-us-gov" : "aws")+":sqs:"+@region+":"+MU::Cloud::AWS.credToAcct(@credentials)+":"+@cloud_id
end
cloud_desc(use_cache: true) click to toggle source

Retrieve the AWS descriptor for this SQS queue. AWS doesn't exactly provide one; if you want real information for SQS ask notify() @return [Hash]: AWS doesn't return anything but the SQS URL, so supplement with attributes

# File modules/mu/providers/aws/msg_queue.rb, line 81
def cloud_desc(use_cache: true)
  return @cloud_desc_cache if @cloud_desc_cache and use_cache
  return nil if !@cloud_id

  if !@cloud_id
    resp = MU::Cloud::AWS.sqs(region: @region, credentials: @credentials).list_queues(
      queue_name_prefix: @mu_name
    )
    return nil if !resp or !resp.queue_urls
    resp.queue_urls.each { |url|
      if url.match(/\/#{Regexp.quote(@mu_name)}$/)
        @cloud_id ||= url
        break
      end
    }
  end

  return nil if !@cloud_id
  @cloud_desc_cache = MU::Cloud::AWS::MsgQueue.find(
    cloud_id: @cloud_id.dup,
    region: @region,
    credentials: @credentials
  )
  @cloud_desc_cache
end
create() click to toggle source

Called automatically by {MU::Deploy#createResources}

# File modules/mu/providers/aws/msg_queue.rb, line 29
def create
  attrs = genQueueAttrs

  namestr = @mu_name
  namestr += ".fifo" if attrs['FifoQueue']

  MU.log "Creating SQS queue #{namestr}", details: attrs
  resp = MU::Cloud::AWS.sqs(region: @region, credentials: @credentials).create_queue(
    queue_name: namestr,
    attributes: attrs
  )
  sleep 1
                                MU.log "SQS queue #{@config['name']} is at: #{resp.queue_url}", MU::SUMMARY
  @cloud_id = resp.queue_url
end
groom() click to toggle source

Called automatically by {MU::Deploy#createResources}

# File modules/mu/providers/aws/msg_queue.rb, line 46
        def groom
          tagQueue

          cur_attrs = notify
#          if cur_attrs["Policy"]
#            MU.log "FECK", MU::WARN, details: JSON.parse(cur_attrs["Policy"]).to_yaml
#          end
          new_attrs = genQueueAttrs

          changed = false
          new_attrs.each_pair { |k, _v|
            if !cur_attrs.has_key?(k) or cur_attrs[k] != new_attrs[k]
              changed = true
            end
          }
          if changed
            MU.log "Updating SQS queue #{@mu_name}", MU::NOTICE, details: new_attrs
            MU::Cloud::AWS.sqs(region: @region, credentials: @credentials).set_queue_attributes(
              queue_url: @cloud_id,
              attributes: new_attrs
            )
          end

        end
notify() click to toggle source

Return the metadata for this MsgQueue rule @return [Hash]

# File modules/mu/providers/aws/msg_queue.rb, line 109
def notify
  cloud_desc
  deploy_struct = MU::Cloud::AWS::MsgQueue.find(
    cloud_id: @cloud_id,
    region: @region,
    credentials: @credentials
  )
  return deploy_struct
end

Private Instance Methods

genQueueAttrs() click to toggle source
# File modules/mu/providers/aws/msg_queue.rb, line 415
        def genQueueAttrs
          attrs = {
            "MaximumMessageSize" => @config['max_msg_size'].to_s,
            "MessageRetentionPeriod" => @config['retain'].to_s,
            "DelaySeconds" => @config['delay'].to_s,
            "ReceiveMessageWaitTimeSeconds" => @config['receive_timeout'].to_s
          }

          if @config['failqueue']
            sibling = @deploy.findLitterMate(type: "msg_queue", name: @config['failqueue']['name'])
            id = @config['failqueue']['name']
            if sibling # resolve sibling queues to something useful
              id = sibling.cloud_id
            end
            desc = MU::Cloud::AWS::MsgQueue.find(cloud_id: id, credentials: @credentials)
            if !desc
              raise MuError, "Failed to get cloud descriptor for SQS queue #{@config['failqueue']['name']}"
            end
            rdr_pol = {
              "deadLetterTargetArn" => desc["QueueArn"],
              "maxReceiveCount" => @config['failqueue']['retries_before_fail']
            }
            attrs["RedrivePolicy"] = JSON.generate(rdr_pol)
          end

          # These aren't supported in most regions, and will fail loudly and
          # spectacularly if you try to use them in the forbidden lands.
          if @config['fifo'] or @config['dedup']
            attrs["FifoQueue"] = "true" # dedup enables fifo implicitly
            attrs["ContentBasedDeduplication"] = @config['dedup'].to_s
          end
          if @config['kms']
            attrs["KmsMasterKeyId"] = @config['kms']['key_id'].to_s
            attrs["KmsDataKeyReusePeriodSeconds"] = @config['kms']['key_reuse_period'].to_s
          end

# TODO this doesn't work as either an ARN, short identifier, or full JSON policy descriptor. Docs are vague. Need to ask AWS.
#          if @config['iam_policy']
#            attrs["Policy"] = JSON.generate(@config['iam_policy'])
#          end

          attrs
        end
tagQueue(url = nil) click to toggle source
# File modules/mu/providers/aws/msg_queue.rb, line 459
def tagQueue(url = nil)
  tags = {}
  tags["Name"] = @mu_name

  MU::MommaCat.listStandardTags.each_pair { |name, value|
    tags[name] = value
  }

  if @config['optional_tags']
    MU::MommaCat.listOptionalTags.each_pair { |name, value|
      tags[name] = value
    }
  end

  if @config['tags']
    @config['tags'].each { |tag|
      tags[tag['key']] = tag['value']
    }
  end
  if !url
    desc = cloud_desc
    url = desc["Url"]
    if !url
      raise MU::MuError, "Can't tag SQS queue, failed to retrieve queue_url"
    end
  end

  begin
    MU::Cloud::AWS.sqs(region: @region, credentials: @credentials).tag_queue(
      queue_url: url,
      tags: tags
    )
  rescue ::Aws::SQS::Errors::UnsupportedOperation, NameError => e
    MU.log "We appear to be in a region that does not support SQS tagging. Skipping tags for #{@mu_name}", MU::NOTICE, details: e.message
  end
end