module Docker::Registry::Sync::CMD
Attributes
config[RW]
producer_finished[RW]
status_queue[RW]
threads[RW]
work_queue[RW]
Public Class Methods
configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool, log_level = :debug)
click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 16 def configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool, log_level = :debug) unless source_bucket.nil? source_region, source_bucket = source_bucket.split(':') else source_region, source_bucket = nil, nil end unless target_buckets.nil? target_buckets = target_buckets.split(',').collect { |bucket| bucket.split(':') } else target_buckets = nil end unless sqs_queue.nil? sqs_region, sqs_uri = sqs_queue.split(':') else sqs_region, sqs_uri = nil, nil end @synced_images = RingBuffer.new 10000 Docker::Registry::Sync.configure do |config| config.source_bucket = source_bucket config.source_region = source_region config.target_buckets = target_buckets config.source_sse = source_uses_sse config.sse = use_sse config.sqs_region = sqs_region config.pool_size = pool config.sqs_url = "https://#{sqs_uri}" config.log_level = log_level end @config = Docker::Registry::Sync.config end
configure_signal_handlers()
click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 51 def configure_signal_handlers @terminated = false Signal.trap('INT') do @config.logger.error 'Received INT signal...' @threads.synchronize do @producer_finished = true @terminated = true @work_queue.clear end end Signal.trap('TERM') do @config.logger.error 'Received TERM signal...' @threads.synchronize do @producer_finished = true @terminated = true @work_queue.clear end end end
configure_workers()
click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 72 def configure_workers @threads = Array.new(@config.pool_size) @work_queue = Queue.new @status_queue = Queue.new @threads.extend(MonitorMixin) @threads_available = @threads.new_cond @producer_finished = false end
finalize_message(receipt_handle)
click to toggle source
# File lib/docker/registry/sync/sqs.rb, line 39 def finalize_message(receipt_handle) sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.delete_message(queue_url: @config.sqs_url, receipt_handle: receipt_handle) end
finalize_workers()
click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 89 def finalize_workers @threads.synchronize do @producer_finished = true end @consumer_thread.join @consumer_thread = nil @threads.each { |t| t.join unless t.nil? } @config.logger.info "Processing job results..." success = true loop do begin # One job filure is a run failure success = @status_queue.pop(true) && success rescue ThreadError @config.logger.info "Finished processing job results..." break end end success && !@terminated end
image_exists?(image, bucket, region)
click to toggle source
# File lib/docker/registry/sync/s3.rb, line 10 def image_exists?(image, bucket, region) s3 = Aws::S3::Client.new(region: region) begin s3.head_object(bucket: bucket, key: "registry/repositories/#{image}/_index_images") rescue Aws::S3::Errors::NotFound false else true end end
queue_sync(image, tag)
click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 127 def queue_sync(image, tag) msgs = @config.target_buckets.map do |region, bucket, sse| JSON.dump(retries: 0, image: image, tag: tag, source: { bucket: @config.source_bucket, region: @config.source_region }, target: { bucket: bucket, region: region, sse: !sse.nil? }) end send_message_batch(msgs) ? 0 : 1 end
run_sync()
click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 145 def run_sync ec = 1 configure_signal_handlers begin @config.logger.info 'Polling queue for images to sync...' sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.receive_message( queue_url: @config.sqs_url, max_number_of_messages: 1, visibility_timeout: 900, # Give ourselves 15min to sync the image wait_time_seconds: 10, # Wait a maximum of 10s for a new message ) @config.logger.info "SQS returned #{resp.messages.length} new images to sync..." if resp.messages.length == 1 message = resp.messages[0] data = JSON.load(message.body) @config.logger.info "Image sync data: #{data}" if image_exists?(data['image'], data['target']['bucket'], data['target']['region']) configure_workers start_workers @config.logger.info("Syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") success = sync_tag(data['image'], data['tag'], data['target']['bucket'], data['target']['region'], data['target']['sse'], data['source']['bucket'], data['source']['region']) success = finalize_workers && success if success @config.logger.info("Finished syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") finalize_message(message.receipt_handle) else @config.logger.info("Falied to sync tag, leaving on queue: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") end else configure_workers start_workers success = sync_repo(data['image'], data['target']['bucket'], data['target']['region'], data['target']['sse'], data['source']['bucket'], data['source']['region']) success = finalize_workers && success @config.logger.info("Syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") if success @config.logger.info("Finished syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") finalize_message(message.receipt_handle) else @config.logger.error("Failed to sync image, leaving on queue: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") end end end ec = 0 sleep @config.empty_queue_sleep_time unless @terminated rescue StandardError => e @config.logger.error "An unknown error occurred while monitoring queue: #{e}" @config.logger.error e.traceback @config.logger.error 'Exiting...' @terminated = true ec = 1 finalize_workers # make sure there are no hangers-on end until @terminated ec end
send_message_batch(messages, retries = 5)
click to toggle source
# File lib/docker/registry/sync/sqs.rb, line 11 def send_message_batch(messages, retries = 5) if retries <= 0 success = false messages.each do |msg| @config.logger.Error "Failed to Enqueue message: #{msg}" end else entries = messages.map do |msg| @config.logger.info "Enqueuing message: #{msg}" { id: Digest::MD5.hexdigest(msg), message_body: msg } end sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.send_message_batch(queue_url: @config.sqs_url, entries: entries) if resp.failed.length > 0 rerun = resp.failed.map do |failed| @config.logger.warn "Failed to Enqueue message, re-enqueuing: #{msg}" messages.select { |m| Digest::MD5.hexdigest(m) == failed.id }[0] end sleep 1 # short sleep before trying again... success = send_message_batch(rerun, retries - 1) else success = true end end success end
start_workers()
click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 83 def start_workers @consumer_thread = Thread.new do sync_key_consumer end end
sync(image, tag)
click to toggle source
# File lib/docker/registry/sync/cmd.rb, line 111 def sync(image, tag) configure_signal_handlers configure_workers start_workers success = false @config.target_buckets.each do |region, bucket, sse| if image_exists?(image, bucket, region) success = sync_tag(image, tag, bucket, region, !sse.nil?) else success = sync_repo(image, bucket, region, !sse.nil?) end end success = finalize_workers && success success ? 0 : 1 end
sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil)
click to toggle source
# File lib/docker/registry/sync/s3.rb, line 69 def sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil) source_region ||= @config.source_region source_bucket ||= @config.source_bucket s3_source = Aws::S3::Client.new(region: source_region) s3_target = Aws::S3::Client.new(region: region) ancestry_resp = s3_source.get_object(bucket: source_bucket, key: "registry/images/#{image_id}/ancestry") # Ancestry includes self JSON.load(ancestry_resp.body.read).each do |image| unless @synced_images.include? "#{image}:#{region}:#{bucket}" sync_prefix(s3_source, s3_target, bucket, sse, "registry/images/#{image}/", source_bucket) @synced_images << "#{image}:#{region}:#{bucket}" end end end
sync_key_consumer()
click to toggle source
# File lib/docker/registry/sync/s3.rb, line 122 def sync_key_consumer @config.logger.info "Starting sync consumer..." loop do break if @producer_finished && @work_queue.length == 0 t_index = nil begin sleep 0.1 busy = @threads.select { |t| t.nil? || t.status == false || t['finished'].nil? == false }.length == 0 end until !busy t_index = @threads.rindex { |t| t.nil? || t.status == false || t['finished'].nil? == false } begin opts = @work_queue.pop(true) rescue ThreadError @config.logger.info "No work found on the queue, sleeping..." sleep 1 else if opts[:key] @threads[t_index].join unless @threads[t_index].nil? @threads[t_index] = Thread.new do @config.logger.info "Worker syncing key: #{opts[:key]}" target_client = Aws::S3::Client.new(region: opts[:region]) opts.delete :region success = false begin target_client.copy_object(opts) success = true @config.logger.info "Worker finished syncing key: #{opts[:key]}" rescue StandardError => e @config.logger.error "An unknown error occoured while copying object in s3: #{e}" @config.logger.error e.backtrace ensure Thread.current['finished'] = true @status_queue << success end end else @config.logger.info "Queued work empty: #{opts}" end end end end
sync_keys(target_client, target_bucket, target_sse, keys, source_bucket)
click to toggle source
# File lib/docker/registry/sync/s3.rb, line 103 def sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) keys.each do |key| @config.logger.info "Syncing key #{source_bucket}/#{key} to bucket #{target_bucket}" opts = {acl: 'bucket-owner-full-control', region: target_client.config[:region], bucket: target_bucket, key: key, copy_source: "#{source_bucket}/#{key}"} if @config.sse || target_sse opts[:server_side_encryption] = 'AES256' end if @config.source_sse opts[:copy_source_sse_customer_algorithm] = 'AES256' end @work_queue << opts sleep 0.1 end end
sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket)
click to toggle source
# File lib/docker/registry/sync/s3.rb, line 86 def sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket) keys = [] img_resp = source_client.list_objects(bucket: source_bucket, prefix: prefix) loop do img_resp.contents.each do |item| keys << item.key end if img_resp.last_page? break else img_resp.next_page end end sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) end
sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil)
click to toggle source
# File lib/docker/registry/sync/s3.rb, line 46 def sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil) source_region ||= @config.source_region source_bucket ||= @config.source_bucket s3_source = Aws::S3::Client.new(region: source_region) s3_target = Aws::S3::Client.new(region: region) begin rep_prefix = "registry/repositories/#{repo}/" sync_prefix(s3_source, s3_target, bucket, sse, rep_prefix, source_bucket) img_index_resp = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{repo}/_index_images") JSON.load(img_index_resp.body.read).each do |image| sync_image(image['id'], bucket, region, sse, source_bucket, source_region) end rescue StandardError => e @config.logger.error "An unexpected error occoured while syncing repo #{repo}: #{e}" @config.logger.error e.backtrace false else true end end
sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil)
click to toggle source
# File lib/docker/registry/sync/s3.rb, line 22 def sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil) source_region ||= @config.source_region source_bucket ||= @config.source_bucket s3_source = Aws::S3::Client.new(region: source_region) s3_target = Aws::S3::Client.new(region: region) begin keys = ["tag#{tag}_json", "tag_#{tag}", '_index_images'].map do |key| "registry/repositories/#{image}/#{key}" end sync_keys(s3_target, bucket, sse, keys, source_bucket) img_id = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{image}/tag_#{tag}").body.read sync_image(img_id, bucket, region, sse, source_bucket, source_region) rescue StandardError => e @config.logger.error "An unexpected error occoured while syncing tag #{image}:#{tag}: #{e}" @config.logger.error e.backtrace false else true end end