module Docker::Registry::Sync::CMD

Attributes

config[RW]
producer_finished[RW]
status_queue[RW]
threads[RW]
work_queue[RW]

Public Class Methods

configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool, log_level = :debug) click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 16
def configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool, log_level = :debug)
  unless source_bucket.nil?
    source_region, source_bucket = source_bucket.split(':')
  else
    source_region, source_bucket = nil, nil
  end

  unless target_buckets.nil?
    target_buckets = target_buckets.split(',').collect { |bucket| bucket.split(':') }
  else
    target_buckets = nil
  end

  unless sqs_queue.nil?
    sqs_region, sqs_uri = sqs_queue.split(':')
  else
    sqs_region, sqs_uri = nil, nil
  end

  @synced_images = RingBuffer.new 10000

  Docker::Registry::Sync.configure do |config|
    config.source_bucket = source_bucket
    config.source_region = source_region
    config.target_buckets = target_buckets
    config.source_sse = source_uses_sse
    config.sse = use_sse
    config.sqs_region = sqs_region
    config.pool_size = pool
    config.sqs_url = "https://#{sqs_uri}"
    config.log_level = log_level
  end
  @config = Docker::Registry::Sync.config
end
configure_signal_handlers() click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 51
def configure_signal_handlers
  @terminated = false

  Signal.trap('INT') do
    @config.logger.error 'Received INT signal...'
    @threads.synchronize do
      @producer_finished = true
      @terminated = true
      @work_queue.clear
    end
  end
  Signal.trap('TERM') do
    @config.logger.error 'Received TERM signal...'
    @threads.synchronize do
      @producer_finished = true
      @terminated = true
      @work_queue.clear
    end
  end
end
configure_workers() click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 72
def configure_workers
  @threads = Array.new(@config.pool_size)
  @work_queue = Queue.new
  @status_queue = Queue.new

  @threads.extend(MonitorMixin)
  @threads_available = @threads.new_cond

  @producer_finished = false
end
finalize_message(receipt_handle) click to toggle source
# File lib/docker/registry/sync/sqs.rb, line 39
def finalize_message(receipt_handle)
  sqs = Aws::SQS::Client.new(region: @config.sqs_region)
  resp = sqs.delete_message(queue_url: @config.sqs_url,
                            receipt_handle: receipt_handle)
end
finalize_workers() click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 89
def finalize_workers
  @threads.synchronize do
    @producer_finished = true
  end
  @consumer_thread.join
  @consumer_thread = nil
  @threads.each { |t| t.join unless t.nil? }
  @config.logger.info "Processing job results..."

  success = true
  loop do
    begin
      # One job filure is a run failure
      success = @status_queue.pop(true) && success
    rescue ThreadError
      @config.logger.info "Finished processing job results..."
      break
    end
  end
  success && !@terminated
end
image_exists?(image, bucket, region) click to toggle source
# File lib/docker/registry/sync/s3.rb, line 10
def image_exists?(image, bucket, region)
  s3 = Aws::S3::Client.new(region: region)
  begin
    s3.head_object(bucket: bucket, key: "registry/repositories/#{image}/_index_images")
  rescue Aws::S3::Errors::NotFound
    false
  else
    true
  end
end
queue_sync(image, tag) click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 127
def queue_sync(image, tag)
  msgs = @config.target_buckets.map do |region, bucket, sse|
    JSON.dump(retries: 0,
              image: image,
              tag: tag,
              source: {
                bucket: @config.source_bucket,
                region: @config.source_region
              },
              target: {
                bucket: bucket,
                region: region,
                sse: !sse.nil?
              })
  end
  send_message_batch(msgs) ? 0 : 1
end
run_sync() click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 145
def run_sync
  ec = 1
  configure_signal_handlers
  begin
    @config.logger.info 'Polling queue for images to sync...'
    sqs = Aws::SQS::Client.new(region: @config.sqs_region)
    resp = sqs.receive_message(
      queue_url: @config.sqs_url,
      max_number_of_messages: 1,
      visibility_timeout: 900, # Give ourselves 15min to sync the image
      wait_time_seconds: 10, # Wait a maximum of 10s for a new message
    )
    @config.logger.info "SQS returned #{resp.messages.length} new images to sync..."
    if resp.messages.length == 1
      message = resp.messages[0]
      data = JSON.load(message.body)
      @config.logger.info "Image sync data:  #{data}"

      if image_exists?(data['image'], data['target']['bucket'], data['target']['region'])
        configure_workers
        start_workers
        @config.logger.info("Syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}")
        success = sync_tag(data['image'], data['tag'], data['target']['bucket'], data['target']['region'], data['target']['sse'], data['source']['bucket'], data['source']['region'])
        success = finalize_workers && success

        if success
          @config.logger.info("Finished syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}")
          finalize_message(message.receipt_handle)
        else
          @config.logger.info("Falied to sync tag, leaving on queue: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}")
        end
      else
        configure_workers
        start_workers
        success = sync_repo(data['image'], data['target']['bucket'], data['target']['region'], data['target']['sse'], data['source']['bucket'], data['source']['region'])
        success = finalize_workers && success
        @config.logger.info("Syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}")
        if success
          @config.logger.info("Finished syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}")
          finalize_message(message.receipt_handle)
        else
          @config.logger.error("Failed to sync image, leaving on queue: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}")
        end
      end
    end
    ec = 0
    sleep @config.empty_queue_sleep_time unless @terminated
  rescue StandardError => e
    @config.logger.error "An unknown error occurred while monitoring queue: #{e}"
    @config.logger.error e.traceback
    @config.logger.error 'Exiting...'
    @terminated = true
    ec = 1
    finalize_workers # make sure there are no hangers-on
  end until @terminated
  ec
end
send_message_batch(messages, retries = 5) click to toggle source
# File lib/docker/registry/sync/sqs.rb, line 11
def send_message_batch(messages, retries = 5)
  if retries <= 0
    success = false
    messages.each do |msg|
      @config.logger.Error "Failed to Enqueue message: #{msg}"
    end
  else
    entries = messages.map do |msg|
      @config.logger.info "Enqueuing message: #{msg}"
      { id: Digest::MD5.hexdigest(msg), message_body: msg }
    end
    sqs = Aws::SQS::Client.new(region: @config.sqs_region)
    resp = sqs.send_message_batch(queue_url: @config.sqs_url,
                                  entries: entries)
    if resp.failed.length > 0
      rerun = resp.failed.map do |failed|
        @config.logger.warn "Failed to Enqueue message, re-enqueuing: #{msg}"
        messages.select { |m| Digest::MD5.hexdigest(m) == failed.id }[0]
      end
      sleep 1 # short sleep before trying again...
      success = send_message_batch(rerun, retries - 1)
    else
      success = true
    end
  end
  success
end
start_workers() click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 83
def start_workers
  @consumer_thread = Thread.new do
    sync_key_consumer
  end
end
sync(image, tag) click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 111
def sync(image, tag)
  configure_signal_handlers
  configure_workers
  start_workers
  success = false
  @config.target_buckets.each do |region, bucket, sse|
    if image_exists?(image, bucket, region)
      success = sync_tag(image, tag, bucket, region, !sse.nil?)
    else
      success = sync_repo(image, bucket, region, !sse.nil?)
    end
  end
  success = finalize_workers && success
  success ? 0 : 1
end
sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil) click to toggle source
# File lib/docker/registry/sync/s3.rb, line 69
def sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil)
  source_region ||= @config.source_region
  source_bucket ||= @config.source_bucket
  s3_source = Aws::S3::Client.new(region: source_region)
  s3_target = Aws::S3::Client.new(region: region)

  ancestry_resp = s3_source.get_object(bucket: source_bucket, key: "registry/images/#{image_id}/ancestry")
  # Ancestry includes self
  JSON.load(ancestry_resp.body.read).each do |image|
    unless @synced_images.include? "#{image}:#{region}:#{bucket}"
      sync_prefix(s3_source, s3_target, bucket, sse, "registry/images/#{image}/", source_bucket)
      @synced_images << "#{image}:#{region}:#{bucket}"
    end
  end
end
sync_key_consumer() click to toggle source
# File lib/docker/registry/sync/s3.rb, line 122
def sync_key_consumer
  @config.logger.info "Starting sync consumer..."
  loop do
    break if @producer_finished && @work_queue.length == 0
    t_index = nil

    begin
      sleep 0.1
      busy = @threads.select { |t| t.nil? || t.status == false  || t['finished'].nil? == false }.length == 0
    end until !busy
    t_index = @threads.rindex { |t| t.nil? || t.status == false || t['finished'].nil? == false }

    begin
      opts = @work_queue.pop(true)
    rescue ThreadError
      @config.logger.info "No work found on the queue, sleeping..."
      sleep 1
    else
      if opts[:key]
        @threads[t_index].join unless @threads[t_index].nil?
        @threads[t_index] = Thread.new do
          @config.logger.info "Worker syncing key: #{opts[:key]}"
          target_client = Aws::S3::Client.new(region: opts[:region])
          opts.delete :region
          success = false
          begin
            target_client.copy_object(opts)
            success = true
            @config.logger.info "Worker finished syncing key: #{opts[:key]}"
          rescue StandardError => e
            @config.logger.error "An unknown error occoured while copying object in s3: #{e}"
            @config.logger.error e.backtrace
          ensure
            Thread.current['finished'] = true
            @status_queue << success
          end
        end
      else
        @config.logger.info "Queued work empty: #{opts}"
      end
    end
  end
end
sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) click to toggle source
# File lib/docker/registry/sync/s3.rb, line 103
def sync_keys(target_client, target_bucket, target_sse, keys, source_bucket)
  keys.each do |key|
    @config.logger.info "Syncing key #{source_bucket}/#{key} to bucket #{target_bucket}"
    opts = {acl: 'bucket-owner-full-control',
            region: target_client.config[:region],
            bucket: target_bucket,
            key: key,
            copy_source: "#{source_bucket}/#{key}"}
    if @config.sse || target_sse
      opts[:server_side_encryption] = 'AES256'
    end
    if @config.source_sse
      opts[:copy_source_sse_customer_algorithm] = 'AES256'
    end
    @work_queue << opts
    sleep 0.1
  end
end
sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket) click to toggle source
# File lib/docker/registry/sync/s3.rb, line 86
def sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket)
  keys = []
  img_resp = source_client.list_objects(bucket: source_bucket, prefix: prefix)

  loop do
    img_resp.contents.each do |item|
      keys << item.key
    end
    if img_resp.last_page?
      break
    else
      img_resp.next_page
    end
  end
  sync_keys(target_client, target_bucket, target_sse, keys, source_bucket)
end
sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil) click to toggle source
# File lib/docker/registry/sync/s3.rb, line 46
def sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil)
  source_region ||= @config.source_region
  source_bucket ||= @config.source_bucket
  s3_source = Aws::S3::Client.new(region: source_region)
  s3_target = Aws::S3::Client.new(region: region)

  begin
    rep_prefix = "registry/repositories/#{repo}/"
    sync_prefix(s3_source, s3_target, bucket, sse, rep_prefix, source_bucket)

    img_index_resp = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{repo}/_index_images")
    JSON.load(img_index_resp.body.read).each do |image|
      sync_image(image['id'], bucket, region, sse, source_bucket, source_region)
    end
  rescue StandardError => e
    @config.logger.error "An unexpected error occoured while syncing repo #{repo}: #{e}"
    @config.logger.error e.backtrace
    false
  else
    true
  end
end
sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil) click to toggle source
# File lib/docker/registry/sync/s3.rb, line 22
def sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil)
  source_region ||= @config.source_region
  source_bucket ||= @config.source_bucket

  s3_source = Aws::S3::Client.new(region: source_region)
  s3_target = Aws::S3::Client.new(region: region)

  begin
    keys = ["tag#{tag}_json", "tag_#{tag}", '_index_images'].map do |key|
      "registry/repositories/#{image}/#{key}"
    end
    sync_keys(s3_target, bucket, sse, keys, source_bucket)

    img_id = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{image}/tag_#{tag}").body.read
    sync_image(img_id, bucket, region, sse, source_bucket, source_region)
  rescue StandardError => e
    @config.logger.error "An unexpected error occoured while syncing tag #{image}:#{tag}: #{e}"
    @config.logger.error e.backtrace
    false
  else
    true
  end
end