class Prefab::ConfigClient
Constants
- DEFAULT_CHECKPOINT_FREQ_SEC
- DEFAULT_S3CF_BUCKET
- RECONNECT_WAIT
Public Class Methods
new(base_client, timeout)
click to toggle source
# File lib/prefab/config_client.rb, line 7 def initialize(base_client, timeout) @base_client = base_client @timeout = timeout @initialization_lock = Concurrent::ReadWriteLock.new @checkpoint_freq_secs = DEFAULT_CHECKPOINT_FREQ_SEC @config_loader = Prefab::ConfigLoader.new(@base_client) @config_resolver = Prefab::ConfigResolver.new(@base_client, @config_loader) @initialization_lock.acquire_write_lock @cancellable_interceptor = Prefab::CancellableInterceptor.new(@base_client) @s3_cloud_front = ENV["PREFAB_S3CF_BUCKET"] || DEFAULT_S3CF_BUCKET load_checkpoint start_checkpointing_thread end
value_to_delta(key, config_value, namespace = nil)
click to toggle source
# File lib/prefab/config_client.rb, line 60 def self.value_to_delta(key, config_value, namespace = nil) Prefab::ConfigDelta.new(key: [namespace, key].compact.join(":"), value: config_value) end
Public Instance Methods
get(prop)
click to toggle source
# File lib/prefab/config_client.rb, line 31 def get(prop) @initialization_lock.with_read_lock do @config_resolver.get(prop) end end
reset()
click to toggle source
# File lib/prefab/config_client.rb, line 51 def reset @base_client.reset! @_stub = nil end
start_streaming()
click to toggle source
# File lib/prefab/config_client.rb, line 26 def start_streaming @streaming = true start_api_connection_thread(@config_loader.highwater_mark) end
to_s()
click to toggle source
# File lib/prefab/config_client.rb, line 56 def to_s @config_resolver.to_s end
upsert(key, config_value, namespace = nil, previous_key = nil)
click to toggle source
# File lib/prefab/config_client.rb, line 37 def upsert(key, config_value, namespace = nil, previous_key = nil) raise "Key must not contain ':' set namespaces separately" if key.include? ":" raise "Namespace must not contain ':'" if namespace&.include?(":") config_delta = Prefab::ConfigClient.value_to_delta(key, config_value, namespace) upsert_req = Prefab::UpsertRequest.new(config_delta: config_delta) upsert_req.previous_key = previous_key if previous_key&.present? @base_client.request Prefab::ConfigService, :upsert, req_options: { timeout: @timeout }, params: upsert_req @base_client.stats.increment("prefab.config.upsert") @config_loader.set(config_delta) @config_loader.rm(previous_key) if previous_key&.present? @config_resolver.update end
Private Instance Methods
finish_init!(source)
click to toggle source
# File lib/prefab/config_client.rb, line 145 def finish_init!(source) if @initialization_lock.write_locked? @base_client.log_internal Logger::DEBUG, "Unlocked Config via #{source}" @initialization_lock.release_write_lock @base_client.log.set_config_client(self) end end
load_checkpoint()
click to toggle source
Bootstrap out of the cache returns the high-watermark of what was in the cache
# File lib/prefab/config_client.rb, line 76 def load_checkpoint success = load_checkpoint_from_config if !success @base_client.log_internal Logger::INFO, "Fallback to S3" load_checkpoint_from_s3 end rescue => e @base_client.log_internal Logger::WARN, "Unexpected problem loading checkpoint #{e}" end
load_checkpoint_from_config()
click to toggle source
# File lib/prefab/config_client.rb, line 88 def load_checkpoint_from_config config_req = Prefab::ConfigServicePointer.new(account_id: @base_client.account_id, start_at_id: @config_loader.highwater_mark) resp = stub.get_all_config(config_req) load_deltas(resp, :api) resp.deltas.each do |delta| @config_loader.set(delta) end @config_resolver.update finish_init!(:api) true rescue => e @base_client.log_internal Logger::WARN, "Unexpected problem loading checkpoint #{e}" false end
load_checkpoint_from_s3()
click to toggle source
# File lib/prefab/config_client.rb, line 104 def load_checkpoint_from_s3 url = "#{@s3_cloud_front}/#{@base_client.api_key.gsub("|", "/")}" resp = Faraday.get url if resp.status == 200 deltas = Prefab::ConfigDeltas.decode(resp.body) load_deltas(deltas, :s3) else @base_client.log_internal Logger::INFO, "No S3 checkpoint. Response #{resp.status} Plan may not support this." end end
load_deltas(deltas, source)
click to toggle source
# File lib/prefab/config_client.rb, line 116 def load_deltas(deltas, source) deltas.deltas.each do |delta| @config_loader.set(delta) end @base_client.log_internal Logger::INFO, "Found checkpoint with highwater id #{@config_loader.highwater_mark} from #{source}" @base_client.stats.increment("prefab.config.checkpoint.load") @config_resolver.update finish_init!(source) end
start_api_connection_thread(start_at_id)
click to toggle source
Setup a streaming connection to the API Save new config values into the loader
# File lib/prefab/config_client.rb, line 155 def start_api_connection_thread(start_at_id) config_req = Prefab::ConfigServicePointer.new(account_id: @base_client.account_id, start_at_id: start_at_id) @base_client.log_internal Logger::DEBUG, "start api connection thread #{start_at_id}" @base_client.stats.increment("prefab.config.api.start") @api_connection_thread = Thread.new do at_exit do @streaming = false @cancellable_interceptor.cancel end while @streaming do begin resp = stub.get_config(config_req) resp.each do |r| r.deltas.each do |delta| @config_loader.set(delta) end @config_resolver.update finish_init!(:streaming) end rescue => e if @streaming level = e.code == 1 ? Logger::DEBUG : Logger::INFO @base_client.log_internal level, ("config client encountered #{e.message} pausing #{RECONNECT_WAIT}") reset sleep(RECONNECT_WAIT) end end end end end
start_checkpointing_thread()
click to toggle source
A thread that checks for a checkpoint
# File lib/prefab/config_client.rb, line 127 def start_checkpointing_thread Thread.new do loop do begin load_checkpoint started_at = Time.now delta = @checkpoint_freq_secs - (Time.now - started_at) if delta > 0 sleep(delta) end rescue StandardError => exn @base_client.log_internal Logger::INFO, "Issue Checkpointing #{exn.message}" end end end end
stub()
click to toggle source
# File lib/prefab/config_client.rb, line 67 def stub @_stub = Prefab::ConfigService::Stub.new(nil, nil, channel_override: @base_client.channel, interceptors: [@base_client.interceptor, @cancellable_interceptor]) end