class MU::Cloud::AWS::CacheCluster

A cache cluster as configured in {MU::Config::BasketofKittens::cache_clusters}

Public Class Methods

cleanup(noop: false, deploy_id: MU.deploy_id, ignoremaster: false, credentials: nil, region: MU.curRegion, flags: {}) click to toggle source

Called by {MU::Cleanup}. Locates resources that were created by the currently-loaded deployment and purges them. @param noop [Boolean]: If true, will only print what would be done. @param ignoremaster [Boolean]: If true, will remove resources not flagged as originating from this Mu server. @param region [String]: The cloud provider's region in which to operate. @return [void]

# File modules/mu/providers/aws/cache_cluster.rb, line 572
def self.cleanup(noop: false, deploy_id: MU.deploy_id, ignoremaster: false, credentials: nil, region: MU.curRegion, flags: {})
  skipsnapshots = flags["skipsnapshots"]
  all_clusters = MU::Cloud::AWS.elasticache(credentials: credentials, region: region).describe_cache_clusters
  our_clusters = []
  our_replication_group_ids = []

  # Because we can't run list_tags_for_resource on a cache cluster that isn't in "available" state we're loading the deploy to make sure we have a cache cluster to cleanup.
  # To ensure we don't miss cache clusters that have been terminated mid creation we'll load the 'original_config'. We might want to find a better approach for this.
  deploy = MU::MommaCat.getLitter(deploy_id)
  if deploy.original_config && deploy.original_config.has_key?("cache_clusters") && !deploy.original_config["cache_clusters"].empty?

    # The ElastiCache API and documentation are a mess, the replication group ARN resource_type is not documented, and is not easily guessable.
    # So instead of searching for replication groups directly we'll get their IDs from the cache clusters.
    all_clusters.cache_clusters.each { |cluster|
      cluster_id = cluster.cache_cluster_id

      # ElastiCache API is buggy... It will throw a CacheClusterNotFound if we run list_tags_for_resource on a cahe cluster that isn't in an "available" state.
      if cluster.cache_cluster_status != "available"
        if %w{deleting deleted}.include?(cluster.cache_cluster_status)
          # The cahe cluster might not be ours so don't notify us about it.
          next
        else
          # We can replace this with an AWS waiter.
          loop do
            MU.log "Waiting for #{cluster_id} to be in a removable state", MU::NOTICE
            cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(cluster_id, region: region)
            break if !cluster
            break unless %w{creating modifying backing-up}.include?(cluster.cache_cluster_status)
            sleep 60
          end
        end
      end

      arn = MU::Cloud::AWS::CacheCluster.getARN(cluster_id, "cluster", "elasticache", region: region, credentials: credentials)
      attempts = 0

      begin
        tags = MU::Cloud::AWS.elasticache(credentials: credentials, region: region).list_tags_for_resource(resource_name: arn).tag_list
      rescue Aws::ElastiCache::Errors::CacheClusterNotFound => e
        if attempts < 5
          MU.log "Can't get tags for #{cluster_id}, retrying a few times in case of a lagging resource", MU::WARN
          MU.log "arn #{arn}", MU::WARN
          attempts += 1
          sleep 30
          retry
        else
          raise MuError, "Failed to get tags for cache cluster #{cluster_id}, MU-ID #{deploy_id}: #{e.inspect}"
        end
      end

      found_muid = false
      found_master = false
      tags.each { |tag|
        found_muid = true if tag.key == "MU-ID" && tag.value == deploy_id
        found_master = true if tag.key == "MU-MASTER-IP" && tag.value == MU.mu_public_ip
      }
      next if !found_muid

      delete =
        if ignoremaster && found_muid
          true
        elsif !ignoremaster && found_muid && found_master
          true
        else
          false
        end

      if delete
        cluster.replication_group_id ? our_replication_group_ids << cluster.replication_group_id : our_clusters << cluster
      end
    }

    threads = []

    # Make sure we have only uniqe replication group IDs
    our_replication_group_ids = our_replication_group_ids.uniq
    if !our_replication_group_ids.empty?
      our_replication_group_ids.each { |group_id|
        replication_group = MU::Cloud::AWS::CacheCluster.getCacheReplicationGroupById(group_id, region: region)
        parent_thread_id = Thread.current.object_id
        threads << Thread.new(replication_group) { |myrepl_group|
          MU.dupGlobals(parent_thread_id)
          Thread.abort_on_exception = true
          terminate_replication_group(myrepl_group, noop: noop, skipsnapshots: skipsnapshots, region: region, credentials: credentials)
        }
      }
    end

    # Hmmmm. Do we need to have seperate thread groups for clusters and replication groups?
    if !our_clusters.empty?
      our_clusters.each { |cluster|
        parent_thread_id = Thread.current.object_id
        threads << Thread.new(cluster) { |mycluster|
          MU.dupGlobals(parent_thread_id)
          Thread.abort_on_exception = true
          terminate_cache_cluster(mycluster, noop: noop, skipsnapshots: skipsnapshots, region: region, credentials: credentials)
        }
      }
    end

    # Wait for all of the cache cluster  and replication groups to finish cleanup before proceeding
    threads.each { |t|
      t.join
    }
  end
end
find(**args) click to toggle source

Locate an existing Cache Cluster or Cache Clusters and return an array containing matching AWS resource descriptors for those that match. @return [Hash<String,OpenStruct>]: The cloud provider's complete descriptions of matching Cache Clusters.

# File modules/mu/providers/aws/cache_cluster.rb, line 43
def self.find(**args)
  found = {}

  if args[:cloud_id]
    cache_cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(args[:cloud_id], region: args[:region], credentials: args[:credentials])
    found[args[:cloud_id]] = cache_cluster if cache_cluster
  end

  MU::Cloud::AWS.elasticache(region: args[:region], credentials: args[:credentials]).describe_cache_clusters.cache_clusters.each { |cc|
    if args[:tag_value]
      resp = MU::Cloud::AWS.elasticache(region: args[:region], credentials: args[:credentials]).list_tags_for_resource(
        resource_name: MU::Cloud::AWS::CacheCluster.getARN(cc.cache_cluster_id, "cluster", "elasticache", region: args[:region], credentials: args[:credentials])
      )
      if resp && resp.tag_list && !resp.tag_list.empty?
        resp.tag_list.each { |tag|
          found[cc.cache_cluster_id] = cc if tag.key == args[:tag_key] and tag.value == args[:tag_value]
        }
      end
    else
      found[cc.cache_cluster_id] = cc
    end
  }

  return found
end
getARN(resource, resource_type, client_type, region: MU.curRegion, credentials: nil) click to toggle source

Construct an Amazon Resource Name for an AWS resource. Some APIs require this identifier in order to do things that other APIs can do with shorthand. @param resource [String]: The name of the resource @param client_type [String]: The name of the client (eg. elasticache, rds, ec2, s3) @param resource_type [String]: The type of the resource @param region [String]: The region in which the resource resides. @param credentials [String]: The account in which the resource resides. @return [String]

# File modules/mu/providers/aws/cache_cluster.rb, line 77
def self.getARN(resource, resource_type, client_type, region: MU.curRegion, credentials: nil)
  aws_str = MU::Cloud::AWS.isGovCloud?(region) ? "aws-us-gov" : "aws"
  "arn:#{aws_str}:#{client_type}:#{region}:#{MU::Cloud::AWS.credToAcct(credentials)}:#{resource_type}:#{resource}"
end
getCacheClusterById(cc_id, region: MU.curRegion, credentials: nil) click to toggle source

Retrieve the complete cloud provider description of a cache cluster. @param cc_id [String]: The cloud provider's identifier for this cache cluster. @param region [String]: The cloud provider's region. @return [OpenStruct]

# File modules/mu/providers/aws/cache_cluster.rb, line 387
def self.getCacheClusterById(cc_id, region: MU.curRegion, credentials: nil)
  begin
    MU::Cloud::AWS.elasticache(region: region, credentials: credentials).describe_cache_clusters(cache_cluster_id: cc_id).cache_clusters.first
  rescue Aws::ElastiCache::Errors::CacheClusterNotFound
    nil
  end
end
getCacheReplicationGroupById(repl_group_id, region: MU.curRegion, credentials: nil) click to toggle source

Retrieve the complete cloud provider description of a cache replication group. @param repl_group_id [String]: The cloud provider's identifier for this cache replication group. @param region [String]: The cloud provider's region. @return [OpenStruct]

# File modules/mu/providers/aws/cache_cluster.rb, line 399
def self.getCacheReplicationGroupById(repl_group_id, region: MU.curRegion, credentials: nil)
  MU::Cloud::AWS.elasticache(region: region, credentials: credentials).describe_replication_groups(replication_group_id: repl_group_id).replication_groups.first
end
isGlobal?() click to toggle source

Does this resource type exist as a global (cloud-wide) artifact, or is it localized to a region/zone? @return [Boolean]

# File modules/mu/providers/aws/cache_cluster.rb, line 557
def self.isGlobal?
  false
end
new(**args) click to toggle source

Initialize this cloud resource object. Calling super will invoke the initializer defined under {MU::Cloud}, which should set the attribtues listed in {MU::Cloud::PUBLIC_ATTRS} as well as applicable dependency shortcuts, like +@vpc+, for us. @param args [Hash]: Hash of named arguments passed via Ruby's double-splat

Calls superclass method
# File modules/mu/providers/aws/cache_cluster.rb, line 23
def initialize(**args)
  super
  @mu_name ||=
    if @config["create_replication_group"]
      @deploy.getResourceName(@config["name"], max_length: 16, need_unique_string: true)
    else
      @deploy.getResourceName(@config["name"], max_length: 20, need_unique_string: true)
    end

  @mu_name.gsub!(/(--|-$)/i, "")
end
quality() click to toggle source

Denote whether this resource implementation is experiment, ready for testing, or ready for production use.

# File modules/mu/providers/aws/cache_cluster.rb, line 563
def self.quality
  MU::Cloud::RELEASE
end
schema(_config) click to toggle source

Cloud-specific configuration properties. @param _config [MU::Config]: The calling MU::Config object @return [Array<Array,Hash>]: List of required fields, and json-schema Hash of cloud-specific configuration parameters for this resource

# File modules/mu/providers/aws/cache_cluster.rb, line 682
def self.schema(_config)
  toplevel_required = []
  schema = {
    "create_replication_group" => {
      "type" => "boolean",
      "description" => "Create a replication group; will be set automatically if +engine+ is +redis+ and +node_count+ is greated than one."
    },
    "ingress_rules" => MU::Cloud.resourceClass("AWS", "FirewallRule").ingressRuleAddtlSchema
  }
  [toplevel_required, schema]
end
validateConfig(cache, configurator) click to toggle source

Cloud-specific pre-processing of {MU::Config::BasketofKittens::cache_clusters}, bare and unvalidated. @param cache [Hash]: The resource to process and validate @param configurator [MU::Config]: The overall deployment configurator of which this resource is a member @return [Boolean]: True if validation succeeded, False otherwise

# File modules/mu/providers/aws/cache_cluster.rb, line 698
def self.validateConfig(cache, configurator)
  ok = true

  if !cache['vpc']
    siblings = configurator.haveLitterMate?(nil, "vpcs", has_multiple: true)
    if siblings.size == 1
      MU.log "CacheCluster #{cache['name']} did not declare a VPC. Inserting into sibling VPC #{siblings[0]['name']}.", MU::WARN
      cache["vpc"] = {
        "name" => siblings[0]['name'],
        "subnet_pref" => "all_private"
      }
    elsif MU::Cloud::AWS.hosted? and MU::Cloud::AWS.myVPCObj
      cache["vpc"] = {
        "id" => MU.myVPC,
        "subnet_pref" => "all_private"
      }
    else
      MU.log "CacheCluster #{cache['name']} must declare a VPC", MU::ERR
      ok = false
    end

    # Re-insert ourselves with this modification so that our child
    # resources get this VPC we just shoved in
    if ok and cache['vpc']
      cache.delete("#MU_VALIDATED")
      return configurator.insertKitten(cache, "cache_clusters", overwrite: true)
    end
  end

  if cache.has_key?("parameter_group_parameters") && cache["parameter_group_family"].nil?
    MU.log "parameter_group_family must be set when setting parameter_group_parameters", MU::ERR
    ok = false
  end
  if cache["engine"] == "redis"
    # We aren't required to create a cache replication group for a single redis cache cluster,
    # however AWS console does exactly that, ss such we will follow that behavior.
    if cache["node_count"] > 1
      cache["create_replication_group"] = true
      cache["automatic_failover"] = cache["multi_az"]
    end

    # Some instance types don't support snapshotting
    if %w{cache.t2.micro cache.t2.small cache.t2.medium}.include?(cache["size"])
      if cache.has_key?("snapshot_retention_limit") || cache.has_key?("snapshot_window")
        MU.log "Can't set snapshot_retention_limit or snapshot_window on #{cache["size"]}", MU::ERR
        ok = false
      end
    end
  elsif cache["engine"] == "memcached"
    cache["create_replication_group"] = false

    if cache["node_count"] > 20
      MU.log "#{cache['engine']} supports up to 20 nodes per cache cluster", MU::ERR
      ok = false
    end

    # memcached doesn't support snapshots
    if cache.has_key?("snapshot_retention_limit") || cache.has_key?("snapshot_window")
      MU.log "Can't set snapshot_retention_limit or snapshot_window on #{cache["engine"]}", MU::ERR
      ok = false
    end
  end

  ok
end

Private Class Methods

clusterCreateSnap(cluster_id, region, credentials) click to toggle source
# File modules/mu/providers/aws/cache_cluster.rb, line 802
def self.clusterCreateSnap(cluster_id, region, credentials)
  MU.log "Terminating #{cluster_id}. Final snapshot name: #{cluster_id}-mufinal"
  MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_cache_cluster(cache_cluster_id: cluster_id, final_snapshot_identifier: "#{cluster_id}-MUfinal")
end
clusterSkipSnap(cluster_id, region, credentials) click to toggle source
# File modules/mu/providers/aws/cache_cluster.rb, line 796
def self.clusterSkipSnap(cluster_id, region, credentials)
  # We're calling this several times so lets declare it once
  MU.log "Terminating #{cluster_id}. Not saving final snapshot"
  MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_cache_cluster(cache_cluster_id: cluster_id)
end
createSnap(repl_group_id, region, credentials) click to toggle source
# File modules/mu/providers/aws/cache_cluster.rb, line 911
def self.createSnap(repl_group_id, region, credentials)
  MU.log "Terminating #{repl_group_id}. Final snapshot name: #{repl_group_id}-mufinal"
  MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_replication_group(
    replication_group_id: repl_group_id,
    retain_primary_cluster: false,
    final_snapshot_identifier: "#{repl_group_id}-mufinal"
  )
end
delete_parameter_group(parameter_group_id, region: MU.curRegion, credentials: nil) click to toggle source

Remove a Cache Cluster Parameter Group. @param parameter_group_id [string]: The cloud provider's ID of the cache cluster parameter group. @param region [String]: The cloud provider's region in which to operate. @return [void]

# File modules/mu/providers/aws/cache_cluster.rb, line 999
def self.delete_parameter_group(parameter_group_id, region: MU.curRegion, credentials: nil)
  retries ||= 0
  MU.log "Deleting parameter group #{parameter_group_id}"
  MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_cache_parameter_group(
    cache_parameter_group_name: parameter_group_id
  )
rescue Aws::ElastiCache::Errors::CacheParameterGroupNotFound
  MU.log "Parameter group #{parameter_group_id} disappeared before we could remove it", MU::WARN
rescue Aws::ElastiCache::Errors::InvalidCacheParameterGroupState => e
  if retries < 5
    MU.log "Parameter group #{parameter_group_id} is not in a removable state, retrying", MU::WARN
    retries += 1
    sleep 30
    retry
  else
    MU.log "Parameter group #{parameter_group_id} is not in a removable state after several retries, giving up. #{e.inspect}", MU::ERR
  end
end
delete_subnet_group(subnet_group_id, region: MU.curRegion, credentials: nil) click to toggle source

Remove a Cache Cluster Subnet Group. @param subnet_group_id [string]: The cloud provider's ID of the cache cluster subnet group. @param region [String]: The cloud provider's region in which to operate. @return [void]

# File modules/mu/providers/aws/cache_cluster.rb, line 977
def self.delete_subnet_group(subnet_group_id, region: MU.curRegion, credentials: nil)
  retries ||= 0
  MU.log "Deleting Subnet group #{subnet_group_id}"
  MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_cache_subnet_group(cache_subnet_group_name: subnet_group_id)
rescue Aws::ElastiCache::Errors::CacheSubnetGroupNotFoundFault
  MU.log "Subnet group #{subnet_group_id} disappeared before we could remove it", MU::WARN
rescue Aws::ElastiCache::Errors::CacheSubnetGroupInUse => e
  if retries < 5
    MU.log "Subnet group #{subnet_group_id} is not in a removable state, retrying", MU::WARN
    retries += 1
    sleep 30
    retry
  else
    MU.log "Subnet group #{subnet_group_id} is not in a removable state after several retries, giving up. #{e.inspect}", MU::ERR
  end
end
skipSnap(repl_group_id, region, credentials) click to toggle source
# File modules/mu/providers/aws/cache_cluster.rb, line 902
def self.skipSnap(repl_group_id, region, credentials)
  # We're calling this several times so lets declare it once
  MU.log "Terminating #{repl_group_id}. Not saving final snapshot"
  MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_replication_group(
    replication_group_id: repl_group_id,
    retain_primary_cluster: false
  )
end
terminate_cache_cluster(cluster, noop: false, skipsnapshots: false, region: MU.curRegion, credentials: nil) click to toggle source

Remove a Cache Cluster and associated artifacts @param cluster [OpenStruct]: The cloud provider's description of the Cache Cluster artifact. @param noop [Boolean]: If true, will only print what would be done. @param skipsnapshots [Boolean]: If true, will not create a last snapshot before terminating the Cache Cluster. @param region [String]: The cloud provider's region in which to operate. @return [void]

# File modules/mu/providers/aws/cache_cluster.rb, line 772
def self.terminate_cache_cluster(cluster, noop: false, skipsnapshots: false, region: MU.curRegion, credentials: nil)
  raise MuError, "terminate_cache_cluster requires a non-nil cache cluster descriptor" if cluster.nil? || cluster.empty?

  cluster_id = cluster.cache_cluster_id
  subnet_group = cluster.cache_subnet_group_name
  parameter_group = cluster.cache_parameter_group.cache_parameter_group_name

  # hmmmmm we can use an AWS waiter for this...
  unless cluster.cache_cluster_status == "available"
    loop do
      MU.log "Waiting for #{cluster_id} to be in a removable state...", MU::NOTICE
      cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(cluster_id, region: region, credentials: credentials)
      break unless %w{creating modifying backing-up}.include?(cluster.cache_cluster_status)
      sleep 60
    end
  end

  # The API is broken, cluster.cache_nodes is returnning an empty array, and the only URL we can get is the config one with cluster.configuration_endpoint.address.
  # MU::Cloud.resourceClass("AWS", "DNSZone").genericMuDNSEntry(name: cluster_id, target: , cloudclass: MU::Cloud::CacheCluster, delete: true)
  
  if %w{deleting deleted}.include?(cluster.cache_cluster_status)
    MU.log "#{cluster_id} has already been terminated", MU::WARN
  else          
    unless noop
      def self.clusterSkipSnap(cluster_id, region, credentials)
        # We're calling this several times so lets declare it once
        MU.log "Terminating #{cluster_id}. Not saving final snapshot"
        MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_cache_cluster(cache_cluster_id: cluster_id)
      end

      def self.clusterCreateSnap(cluster_id, region, credentials)
        MU.log "Terminating #{cluster_id}. Final snapshot name: #{cluster_id}-mufinal"
        MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_cache_cluster(cache_cluster_id: cluster_id, final_snapshot_identifier: "#{cluster_id}-MUfinal")
      end

      retries = 0
      begin
        if cluster.engine == "memcached"
          clusterSkipSnap(cluster_id, region, credentials)
        else
          skipsnapshots ? clusterSkipSnap(cluster_id, region, credentials) : clusterCreateSnap(cluster_id, region, credentials)
        end
      rescue Aws::ElastiCache::Errors::InvalidCacheClusterState => e
        if retries < 5
          MU.log "#{cluster_id} is not in a removable state, retrying several times", MU::WARN
          retries += 1
          sleep 30
          retry
        else
          MU.log "#{cluster_id} is not in a removable state after several retries, giving up. #{e.inspect}", MU::ERR
          return
        end
      rescue Aws::ElastiCache::Errors::SnapshotAlreadyExistsFault
        MU.log "Snapshot #{cluster_id}-MUfinal already exists", MU::WARN
        clusterSkipSnap(cluster_id, region, credentials)
      rescue Aws::ElastiCache::Errors::SnapshotQuotaExceededFault
        MU.log "Snapshot quota exceeded while deleting #{cluster_id}", MU::ERR
        clusterSkipSnap(cluster_id, region, credentials)
      end

      wait_start_time = Time.now
      retries = 0
      begin
        MU::Cloud::AWS.elasticache(region: region, credentials: credentials).wait_until(:cache_cluster_deleted, cache_cluster_id: cluster_id) do |waiter|
          waiter.max_attempts = nil
          waiter.before_attempt do |attempts|
            MU.log "Waiting for cache cluster #{cluster_id} to delete..", MU::NOTICE if attempts % 10 == 0
          end
          waiter.before_wait do |_attempts, resp|
            throw :success if resp.cache_clusters.first.cache_cluster_status  == "deleted"
            throw :failure if Time.now - wait_start_time > 1800
          end
        end
      rescue Aws::Waiters::Errors::TooManyAttemptsError => e
        raise MuError, "Waited for #{(Time.now - wait_start_time).round/60*(retries+1)} minutes for cache cluster to delete, giving up. #{e}" if retries > 2
        wait_start_time = Time.now
        retries += 1
        retry
      rescue Aws::Waiters::Errors::FailureStateError
        MU.log "#{cluster_id} disappeared on us"
      end
    end
  end

  MU.log "#{cluster_id} has been terminated"

  unless noop
    delete_subnet_group(subnet_group, region: region, credentials: credentials) if subnet_group
    delete_parameter_group(parameter_group, region: region, credentials: credentials) if parameter_group && !parameter_group.start_with?("default")
  end
end
terminate_replication_group(repl_group, noop: false, skipsnapshots: false, region: MU.curRegion, credentials: nil) click to toggle source

Remove a Cache Cluster Replication Group and associated artifacts @param repl_group [OpenStruct]: The cloud provider's description of the Cache Cluster artifact. @param noop [Boolean]: If true, will only print what would be done. @param skipsnapshots [Boolean]: If true, will not create a last snapshot before terminating the Cache Cluster. @param region [String]: The cloud provider's region in which to operate. @return [void]

# File modules/mu/providers/aws/cache_cluster.rb, line 871
def self.terminate_replication_group(repl_group, noop: false, skipsnapshots: false, region: MU.curRegion, credentials: nil)
  raise MuError, "terminate_replication_group requires a non-nil cache replication group descriptor" if repl_group.nil? || repl_group.empty?

  repl_group_id = repl_group.replication_group_id
  # We're assuming that all clusters in this replication group where created in the same deployment so have the same subnet group, parameter group, etc...
  cluster_id = repl_group.member_clusters.first
  cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(cluster_id, region: region)
  subnet_group = cluster.cache_subnet_group_name
  parameter_group = cluster.cache_parameter_group.cache_parameter_group_name

  # hmmmmm we can use an AWS waiter for this...
  unless repl_group.status == "available"
    loop do
      MU.log "Waiting for #{repl_group_id} to be in a removable state...", MU::NOTICE
      repl_group = MU::Cloud::AWS::CacheCluster.getCacheReplicationGroupById(repl_group_id, region: region)
      break unless %w{creating modifying backing-up}.include?(repl_group.status)
      sleep 60
    end
  end

  # What's the likelihood of having more than one node group? maybe iterate over node_groups instead of assuming there is only one?
  MU::Cloud.resourceClass("AWS", "DNSZone").genericMuDNSEntry(name: repl_group_id, target: repl_group.node_groups.first.primary_endpoint.address, cloudclass: MU::Cloud::CacheCluster, delete: true)
  # Assuming we also created DNS records for each of our cluster's read endpoint.
  repl_group.node_groups.first.node_group_members.each { |member|
    MU::Cloud.resourceClass("AWS", "DNSZone").genericMuDNSEntry(name: member.cache_cluster_id, target: member.read_endpoint.address, cloudclass: MU::Cloud::CacheCluster, delete: true)
  }
  
  if %w{deleting deleted}.include?(repl_group.status)
    MU.log "#{repl_group_id} has already been terminated", MU::WARN
  else
    unless noop
      def self.skipSnap(repl_group_id, region, credentials)
        # We're calling this several times so lets declare it once
        MU.log "Terminating #{repl_group_id}. Not saving final snapshot"
        MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_replication_group(
          replication_group_id: repl_group_id,
          retain_primary_cluster: false
        )
      end

      def self.createSnap(repl_group_id, region, credentials)
        MU.log "Terminating #{repl_group_id}. Final snapshot name: #{repl_group_id}-mufinal"
        MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_replication_group(
          replication_group_id: repl_group_id,
          retain_primary_cluster: false,
          final_snapshot_identifier: "#{repl_group_id}-mufinal"
        )
      end

      retries = 0
      begin
        skipsnapshots ? skipSnap(repl_group_id, region, credentials) : createSnap(repl_group_id, region, credentials)
      rescue Aws::ElastiCache::Errors::InvalidReplicationGroupState => e
        if retries < 5
          MU.log "#{repl_group_id} is not in a removable state, retrying several times", MU::WARN
          retries += 1
          sleep 30
          retry
        else
          MU.log "#{repl_group_id} is not in a removable state after several retries, giving up. #{e.inspect}", MU::ERR
          return
        end
      rescue Aws::ElastiCache::Errors::SnapshotAlreadyExistsFault
        MU.log "Snapshot #{repl_group_id}-MUfinal already exists", MU::WARN
        skipSnap(repl_group_id, region, credentials)
      rescue Aws::ElastiCache::Errors::SnapshotQuotaExceededFault
        MU.log "Snapshot quota exceeded while deleting #{repl_group_id}", MU::ERR
        skipSnap(repl_group_id, region, credentials)
      end

      wait_start_time = Time.now
      retries = 0
      begin
        MU::Cloud::AWS.elasticache(region: region).wait_until(:replication_group_deleted, replication_group_id: repl_group_id) do |waiter|
          waiter.max_attempts = nil
          waiter.before_attempt do |attempts|
            MU.log "Waiting for #{repl_group_id} to delete..", MU::NOTICE if attempts % 10 == 0
          end
          waiter.before_wait do |_attempts, resp|
            throw :success if resp.replication_groups.first.status == "deleted"
            throw :failure if Time.now - wait_start_time > 1800
          end
        end
      rescue Aws::Waiters::Errors::TooManyAttemptsError => e
        raise MuError, "Waited for #{(Time.now - wait_start_time).round/60*(retries+1)} minutes for #{repl_group_id} to delete, giving up. #{e}" if retries > 2
        wait_start_time = Time.now
        retries += 1
        retry
      rescue Aws::Waiters::Errors::FailureStateError
        MU.log "#{repl_group_id} disappeared on us"
      end
    end
  end

  MU.log "#{repl_group_id} has been terminated"
  unless noop
    MU::Cloud::AWS::CacheCluster.delete_subnet_group(subnet_group, region: region) if subnet_group
    MU::Cloud::AWS::CacheCluster.delete_parameter_group(parameter_group, region: region) if parameter_group && !parameter_group.start_with?("default")
  end
end

Public Instance Methods

addStandardTags(resource, resource_type, region: MU.curRegion) click to toggle source

Add our standard tag set to an Amazon ElasticCache resource. @param resource [String]: The name of the resource @param resource_type [String]: The type of the resource @param region [String]: The cloud provider region

# File modules/mu/providers/aws/cache_cluster.rb, line 109
def addStandardTags(resource, resource_type, region: MU.curRegion)
  MU.log "Adding tags to ElasticCache resource #{resource}"
  MU::Cloud::AWS.elasticache(region: region).add_tags_to_resource(
    resource_name: MU::Cloud::AWS::CacheCluster.getARN(resource, resource_type, "elasticache", region: @region, credentials: @credentials),
    tags: allTags
  )
end
allTags() click to toggle source

Construct all our tags. @return [Array]: All our standard tags and any custom tags.

# File modules/mu/providers/aws/cache_cluster.rb, line 84
def allTags
  tags = []
  MU::MommaCat.listStandardTags.each_pair { |name, value|
    tags << {key: name, value: value}
  }

  if @config['optional_tags']
    MU::MommaCat.listOptionalTags.each_pair { |name, value|
      tags << {key: name, value: value}
    }
  end

  if @config['tags']
    @config['tags'].each { |tag|
      tags << {key: tag['key'], value: tag['value']}
    }
  end

  return tags
end
arn() click to toggle source

Canonical Amazon Resource Number for this resource @return [String]

# File modules/mu/providers/aws/cache_cluster.rb, line 37
def arn
  "arn:"+(MU::Cloud::AWS.isGovCloud?(@region) ? "aws-us-gov" : "aws")+":elasticache:"+@region+":"+MU::Cloud::AWS.credToAcct(@credentials)+":cluster/"+@cloud_id
end
create() click to toggle source

Called automatically by {MU::Deploy#createResources} @return [String]: The cloud provider's identifier for this cache cluster instance.

# File modules/mu/providers/aws/cache_cluster.rb, line 119
def create
  @config["snapshot_id"] =
    if @config["creation_style"] == "existing_snapshot"
      getExistingSnapshot ? getExistingSnapshot : createNewSnapshot
    elsif @config["creation_style"] == "new_snapshot"
      createNewSnapshot
    end

    # Should we base all our resource names on @mu_name even though only the cache cluster / cache replication group identifiers are the isssue?
  @config['identifier'] = @mu_name
  @config["subnet_group_name"] = @mu_name
  createSubnetGroup

  # Shared configuration elements between cache clusters and cache replication groups
  config_struct = {
    cache_node_type: @config["size"],
    engine: @config["engine"],
    engine_version: @config["engine_version"],
    cache_subnet_group_name: @config["subnet_group_name"],
    preferred_maintenance_window: @config["preferred_maintenance_window"],
    port: @config["port"],
    auto_minor_version_upgrade: @config["auto_minor_version_upgrade"],
    security_group_ids: @config["security_group_ids"],
    tags: allTags
  }

  if @config["engine"] == "redis"
    config_struct[:snapshot_name] = @config["snapshot_id"] if @config["snapshot_id"]
    config_struct[:snapshot_arns] = @config["snapshot_arn"] if @config["snapshot_arn"]
    config_struct[:snapshot_retention_limit] = @config["snapshot_retention_limit"] if @config["snapshot_retention_limit"]
    config_struct[:snapshot_window] = @config["snapshot_window"] if @config["snapshot_window"]
  end

  if @config.has_key?("parameter_group_family")
    # downcasing parameter_group_name because  AWS downcases the name for us and we won't be able to find it.
    # AWS downcases all resource names in ElastiCache for some reason,
    # but other resources don't seem to be case sensitive when we try to retrieve them.
    @config["parameter_group_name"] = @mu_name.downcase
    createParameterGroup
    config_struct[:cache_parameter_group_name] = @config["parameter_group_name"]
  end

  config_struct[:notification_topic_arn] = @config["notification_topic_arn"] if @config["notification_topic_arn"]

  # Mu already cleans up our resources for us when we fail mid creation, so not doing begin/ensure to cleanup in case we fail.
  if @config["create_replication_group"]
    config_struct[:automatic_failover_enabled] = @config['automatic_failover']
    config_struct[:replication_group_id] = @config['identifier']
    config_struct[:replication_group_description] = @mu_name
    config_struct[:num_cache_clusters] = @config["node_count"]
    # config_struct[:primary_cluster_id] = @config["primary_cluster_id"]
    # config_struct[:preferred_cache_cluster_a_zs] = @config["preferred_cache_cluster_azs"]

    MU.log "Creating cache replication group #{@config['identifier']}"
    MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).create_replication_group(config_struct).replication_group

    wait_start_time = Time.now
    retries = 0
    begin
      MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).wait_until(:replication_group_available, replication_group_id: @config['identifier']) do |waiter|
        waiter.max_attempts = nil
        waiter.before_attempt do |attempts|
          MU.log "Waiting for cache replication group #{@config['identifier']} to become available", MU::NOTICE if attempts % 5 == 0
        end
        waiter.before_wait do |_attempts, r|
          throw :success if r.replication_groups.first.status == "available"
          throw :failure if Time.now - wait_start_time > 1800
        end
      end
    rescue Aws::Waiters::Errors::TooManyAttemptsError => e
      raise MuError, "Waited #{(Time.now - wait_start_time).round/60*(retries+1)} minutes for #{@config['identifier']} become available, giving up. #{e}" if retries > 2
      wait_start_time = Time.now
      retries += 1
      retry
    end

    resp = MU::Cloud::AWS::CacheCluster.getCacheReplicationGroupById(@config['identifier'], region: @region)

    # We want to make sure the clusters in the cache replication group get our tags
    resp.member_clusters.each { |member|
      addStandardTags(member, "cluster", region: @region)
    }

    MU::Cloud.resourceClass("AWS", "DNSZone").genericMuDNSEntry(
      name: resp.replication_group_id,
      target: "#{resp.node_groups.first.primary_endpoint.address}.",
      cloudclass: MU::Cloud::CacheCluster,
      sync_wait: @config['dns_sync_wait']
    )

    resp.node_groups.first.node_group_members.each { |member|
      MU::Cloud.resourceClass("AWS", "DNSZone").genericMuDNSEntry(
        name: member.cache_cluster_id,
        target: "#{member.read_endpoint.address}.",
        cloudclass: MU::Cloud::CacheCluster,
        sync_wait: @config['dns_sync_wait']
      )

    }

    MU.log "Cache replication group #{@config['identifier']} is ready to use"
    @cloud_id = resp.replication_group_id
  else
    config_struct[:cache_cluster_id] = @config['identifier']
    config_struct[:az_mode] = @config["multi_az"] ? "cross-az" : "single-az"
    config_struct[:num_cache_nodes] = @config["node_count"]
    # config_struct[:replication_group_id] = @config["replication_group_id"] if @config["replication_group_id"]
    # config_struct[:preferred_availability_zone] = @config["preferred_availability_zone"] if @config["preferred_availability_zone"] && @config["az_mode"] == "single-az"
    # config_struct[:preferred_availability_zones] = @config["preferred_availability_zones"] if @config["preferred_availability_zones"] && @config["az_mode"] == "cross-az"

    MU.log "Creating cache cluster #{@config['identifier']}"
    begin
      MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).create_cache_cluster(config_struct).cache_cluster
    rescue ::Aws::ElastiCache::Errors::InvalidParameterValue => e
      if e.message.match(/security group (sg-[^\s]+)/)
        bad_sg = Regexp.last_match[1]
        MU.log "Removing invalid security group #{bad_sg} from Cache Cluster #{@mu_name}", MU::WARN, details: e.message
        config_struct[:security_group_ids].delete(bad_sg)
        retry
      else
        raise e
      end
    end

    wait_start_time = Time.now
    retries = 0
    begin
      MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).wait_until(:cache_cluster_available, cache_cluster_id: @config['identifier']) do |waiter|
        waiter.max_attempts = nil
        waiter.before_attempt do |attempts|
          MU.log "Waiting for cache cluster #{@config['identifier']} to become available", MU::NOTICE if attempts % 5 == 0
        end
        waiter.before_wait do |_attempts, r|
          throw :success if r.cache_clusters.first.cache_cluster_status  == "available"
          throw :failure if Time.now - wait_start_time > 1800
        end
      end
    rescue Aws::Waiters::Errors::TooManyAttemptsError => e
      raise MuError, "Waited #{(Time.now - wait_start_time).round/60*(retries+1)} minutes for #{@config['identifier']} to become available, giving up. #{e}" if retries > 2
      wait_start_time = Time.now
      retries += 1
      retry
    end

    resp = MU::Cloud::AWS::CacheCluster.getCacheClusterById(@config['identifier'], region: @region, credentials: @credentials)
    MU.log "Cache Cluster #{@config['identifier']} is ready to use"
    @cloud_id = resp.cache_cluster_id
  end
end
createNewSnapshot() click to toggle source

Generate a snapshot from the Cache Cluster described in this instance. @return [String]: The cloud provider's identifier for the snapshot.

# File modules/mu/providers/aws/cache_cluster.rb, line 513
def createNewSnapshot
  snap_id = @deploy.getResourceName(@config["name"]) + Time.new.strftime("%M%S").to_s

  attempts = 0
  begin
   MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).create_snapshot(
      cache_cluster_id: @config["identifier"],
      snapshot_name: snap_id
    )
  rescue Aws::ElastiCache::Errors::InvalidCacheClusterState => e
    if attempts < 10
      MU.log "Tried to create snapshot for #{@config["identifier"]} but cache cluster is busy, retrying a few times"
      attempts += 1
      sleep 30
      retry
    else
      raise MuError, "Failed to create snpashot for cache cluster #{@config["identifier"]}: #{e.inspect}"
    end
  end
  
  attempts = 0
  loop do
    MU.log "Waiting for snapshot of cache cluster  #{@config["identifier"]} to be ready...", MU::NOTICE if attempts % 20 == 0
    MU.log "Waiting for snapshot of cache cluster #{@config["identifier"]} to be ready...", MU::DEBUG

    snapshot_resp = MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).describe_snapshots(snapshot_name: snap_id)
    attempts += 1
    break unless snapshot_resp.snapshots.first.snapshot_status != "available"
    sleep 15
  end

  return snap_id
end
createParameterGroup() click to toggle source

Create a Cache Cluster parameter group.

# File modules/mu/providers/aws/cache_cluster.rb, line 348
def createParameterGroup        
  MU.log "Creating a cache cluster parameter group #{@config["parameter_group_name"]}"
  MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).create_cache_parameter_group(
    cache_parameter_group_name: @config["parameter_group_name"],
    cache_parameter_group_family: @config["parameter_group_family"],
    description: "Parameter group for #{@config["parameter_group_family"]}"
  )

  if @config.has_key?("parameter_group_parameters") && !@config["parameter_group_parameters"].empty?
    params = []
    @config["parameter_group_parameters"].each { |item|
      params << {parameter_name: item['name'], parameter_value: item['value']}
    }

    MU.log "Modifiying cache cluster parameter group #{@config["parameter_group_name"]}"
    MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).modify_cache_parameter_group(
      cache_parameter_group_name: @config["parameter_group_name"],
      parameter_name_values: params
    )
  end
end
createSubnetGroup() click to toggle source

Create a subnet group for a Cache Cluster with the given config.

# File modules/mu/providers/aws/cache_cluster.rb, line 270
def createSubnetGroup
  subnet_ids = []
  if @config["vpc"] && !@config["vpc"].empty?
    raise MuError.new "Didn't find the VPC specified for #{@mu_name}", details: @config["vpc"].to_h unless @vpc

    vpc_id = @vpc.cloud_id

    # Getting subnet IDs
    if @config["vpc"]["subnets"].empty?
      @vpc.subnets.each { |subnet|
        subnet_ids << subnet.cloud_id
      }
      MU.log "No subnets specified for #{@config['identifier']}, adding all subnets in #{@vpc}", MU::DEBUG
    else
      @config["vpc"]["subnets"].each { |subnet|
        subnet_obj = @vpc.getSubnet(cloud_id: subnet["subnet_id"].to_s, name: subnet["subnet_name"].to_s)
        raise MuError.new "Couldn't find a live subnet matching #{subnet} in #{@vpc}", details: @vpc.subnets if subnet_obj.nil?
        subnet_ids << subnet_obj.cloud_id
      }
    end
  else
    # If we didn't specify a VPC try to figure out if the account has a default VPC
    vpc_id = nil
    subnets = []
    MU::Cloud::AWS.ec2(region: @region, credentials: @credentials).describe_vpcs.vpcs.each { |vpc|
      if vpc.is_default
        vpc_id = vpc.vpc_id
        subnets = MU::Cloud::AWS.ec2(region: @region, credentials: @credentials).describe_subnets(
          filters: [
            {
              name: "vpc-id", 
              values: [vpc_id]
            }
          ]
        ).subnets
        break
      end
    }

    if !subnets.empty?
      mu_subnets = []
      subnets.each { |subnet|
        subnet_ids << subnet.subnet_id
        mu_subnets << {"subnet_id" => subnet.subnet_id}
      }

      @config['vpc'] = {
        "vpc_id" => vpc_id,
        "subnets" => mu_subnets
      }

      MU.log "Using default VPC for cache cluster #{@config['identifier']}"
    end
  end

  if subnet_ids.empty?
    raise MuError, "Can't create subnet group #{@config["subnet_group_name"]} because I couldn't find a VPC or subnets"
  else
    MU.log "Creating subnet group #{@config["subnet_group_name"]} for cache cluster #{@config['identifier']}"

    MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).create_cache_subnet_group(
      cache_subnet_group_name: @config["subnet_group_name"],
      cache_subnet_group_description: @config["subnet_group_name"],
      subnet_ids: subnet_ids
    )

    allowBastionAccess

    if @dependencies.has_key?('firewall_rule')
      @config["security_group_ids"] = []
      @dependencies['firewall_rule'].values.each { |sg|
        @config["security_group_ids"] << sg.cloud_id
      }
    end
  end
end
getExistingSnapshot() click to toggle source

@return [String]: The cloud provider's identifier for the snapshot.

# File modules/mu/providers/aws/cache_cluster.rb, line 548
def getExistingSnapshot
  MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).describe_snapshots(snapshot_name: @config["identifier"]).snapshots.first.snapshot_name
rescue NoMethodError
  raise MuError, "Snapshot #{@config["identifier"]} doesn't exist, make sure you provided a valid snapshot ID/Name"
end
getParameterGroup() click to toggle source

Retrieve a Cache Cluster parameter group name of on existing parameter group. @return [String]: Cache Cluster parameter group name.

# File modules/mu/providers/aws/cache_cluster.rb, line 372
def getParameterGroup
  MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).describe_cache_parameter_groups(
    cache_parameter_group_name: @config["parameter_group_name"]
  ).cache_parameter_groups.first.cache_parameter_group_name
end
groom() click to toggle source

Called automatically by {MU::Deploy#createResources}

# File modules/mu/providers/aws/cache_cluster.rb, line 379
def groom
  # Do we have anything to do here??
end
notify() click to toggle source

Register a description of this cache cluster / cache replication group with this deployment's metadata.

# File modules/mu/providers/aws/cache_cluster.rb, line 404
def notify
  ### TO DO: Flatten the replication group deployment metadata structure. It is probably waaaaaaay too nested.
  if @config["create_replication_group"]
    repl_group = MU::Cloud::AWS::CacheCluster.getCacheReplicationGroupById(@config['identifier'], region: @region, credentials: @credentials)
    # DNS records for the "real" zone should always be registered as late as possible so override_existing only overwrites the records after the resource is ready to use.
    if @config['dns_records']
      @config['dns_records'].each { |dnsrec|
        dnsrec['name'] = repl_group.node_groups.first.primary_endpoint.address.downcase if !dnsrec.has_key?('name')
        dnsrec['name'] = "#{dnsrec['name']}.#{MU.environment.downcase}" if dnsrec["append_environment_name"] && !dnsrec['name'].match(/\.#{MU.environment.downcase}$/)
      }
    end
    # XXX this should be a call to @deploy.nameKitten
    MU::Cloud.resourceClass("AWS", "DNSZone").createRecordsFromConfig(@config['dns_records'], target: repl_group.node_groups.first.primary_endpoint.address)

    deploy_struct = {
      "identifier" => repl_group.replication_group_id,
      "create_style" => @config["create_style"],
      "region" => @region,
      "members" => repl_group.member_clusters,
      "automatic_failover" => repl_group.automatic_failover,
      "snapshotting_cluster_id" => repl_group.snapshotting_cluster_id,
      "primary_endpoint" => repl_group.node_groups.first.primary_endpoint.address,
      "primary_port" => repl_group.node_groups.first.primary_endpoint.port
    }

    repl_group.member_clusters.each { |id|
      cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(id, region: @region)

      vpc_sg_ids = []
      cluster.security_groups.each { |vpc_sg|
        vpc_sg_ids << vpc_sg.security_group_id
      }

      cache_sg_ids = []
      unless cluster.cache_security_groups.empty?
        cluster.cache_security_groups.each { |cache_sg|
          cache_sg_ids << cache_sg.security_group_id
        }
      end

      deploy_struct[id] = {
        "configuration_endpoint" => cluster.configuration_endpoint,
        "cache_node_type" => cluster.cache_node_type,
        "engine" => cluster.engine,
        "engine_version" => cluster.engine_version,
        "num_cache_nodes" => cluster.num_cache_nodes,
        "preferred_maintenance_window" => cluster.preferred_maintenance_window,
        "notification_configuration" => cluster.notification_configuration,
        "cache_security_groups" => cache_sg_ids,
        "cache_parameter_group" => cluster.cache_parameter_group.cache_parameter_group_name,
        "cache_subnet_group_name" => cluster.cache_subnet_group_name,
        "cache_nodes" => cluster.cache_nodes,
        "auto_minor_version_upgrade" => cluster.auto_minor_version_upgrade,
        "vpc_security_groups" => vpc_sg_ids,
        "replication_group_id" => cluster.replication_group_id,
        "snapshot_retention_limit" => cluster.snapshot_retention_limit,
        "snapshot_window" => cluster.snapshot_window              
      }
    }

    repl_group.node_groups.first.node_group_members.each{ |member| 
      deploy_struct[member.cache_cluster_id]["cache_node_id"] = member.cache_node_id
      deploy_struct[member.cache_cluster_id]["read_endpoint_address"] = member.read_endpoint.address
      deploy_struct[member.cache_cluster_id]["read_endpoint_port"] = member.read_endpoint.port
      deploy_struct[member.cache_cluster_id]["current_role"] = member.current_role
    }
  else
    cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(@config['identifier'], region: @region, credentials: @credentials)

    vpc_sg_ids = []
    cluster.security_groups.each { |vpc_sg|
      vpc_sg_ids << vpc_sg.security_group_id
    }

    cache_sg_ids = []
    unless cluster.cache_security_groups.empty?
      cluster.cache_security_groups.each { |cache_sg|
        cache_sg_ids << cache_sg.security_group_id
      }
    end

    deploy_struct = {
      "cache_node_type" => cluster.cache_node_type,
      "engine" => cluster.engine,
      "engine_version" => cluster.engine_version,
      "num_cache_nodes" => cluster.num_cache_nodes,
      "preferred_maintenance_window" => cluster.preferred_maintenance_window,
      "notification_configuration" => cluster.notification_configuration,
      "cache_security_groups" => cache_sg_ids,
      "cache_parameter_group" => cluster.cache_parameter_group.cache_parameter_group_name,
      "cache_subnet_group_name" => cluster.cache_subnet_group_name,
      "cache_nodes" => cluster.cache_nodes,
      "auto_minor_version_upgrade" => cluster.auto_minor_version_upgrade,
      "vpc_security_groups" => vpc_sg_ids,
      "replication_group_id" => cluster.replication_group_id,
      "snapshot_retention_limit" => cluster.snapshot_retention_limit,
      "snapshot_window" => cluster.snapshot_window              
    }
    if !cluster.configuration_endpoint.nil?
      deploy_struct["configuration_endpoint_address"] = cluster.configuration_endpoint.address
      deploy_struct["configuration_endpoint_port"] = cluster.configuration_endpoint.port
    end
  end

  return deploy_struct
end