class Scaltainer::Runner
Public Class Methods
new(configfile, statefile, logger, wait, orchestrator, pushgateway)
click to toggle source
# File lib/scaltainer/runner.rb, line 8 def initialize(configfile, statefile, logger, wait, orchestrator, pushgateway) @orchestrator = orchestrator @logger = logger @default_service_config = { "min" => 0, "upscale_quantity" => 1, "downscale_quantity" => 1, "upscale_sensitivity" => 1, "downscale_sensitivity" => 1 } @logger.debug "Scaltainer initialized with configuration file: #{configfile}, and state file: #{statefile}" config = YAML.load_file configfile Docker.logger = @logger if orchestrator == :swarm state = get_state(statefile) || {} endpoint = config["endpoint"] service_type_web = ServiceTypeWeb.new(endpoint) service_type_worker = ServiceTypeWorker.new(endpoint) register_pushgateway(pushgateway) if pushgateway namespace = config["namespace"] || config["stack_name"] loop do run config, state, service_type_web, service_type_worker, namespace save_state statefile, state sync_pushgateway(namespace, state) if pushgateway sleep wait break if wait == 0 end end
Private Instance Methods
get_service(service_name, namespace)
click to toggle source
# File lib/scaltainer/runner.rb, line 100 def get_service(service_name, namespace) begin service = if @orchestrator == :swarm DockerService.new service_name, namespace elsif @orchestrator == :kubernetes KubeResource.new service_name, namespace end rescue => e raise NetworkError.new "Could not find resource with name #{service_name} in namespace #{namespace}: #{e.message}" end raise ConfigurationError.new "Unknown resource: #{service_name} in namespace #{namespace}" unless service service end
get_state(statefile)
click to toggle source
# File lib/scaltainer/runner.rb, line 43 def get_state(statefile) YAML.load_file statefile if File.exists? statefile end
iterate_services(services, namespace, type, state)
click to toggle source
# File lib/scaltainer/runner.rb, line 51 def iterate_services(services, namespace, type, state) begin metrics = type.get_metrics services @logger.debug "Retrieved metrics for #{type} resources: #{metrics}" services.each do |service_name, service_config| begin state[service_name] ||= {} service_state = state[service_name] @logger.debug "Resource #{service_name} in namespace #{namespace} currently has state: #{service_state}" service_config = @default_service_config.merge service_config @logger.debug "Resource #{service_name} in namespace #{namespace} configuration: #{service_config}" process_service service_name, service_config, service_state, namespace, type, metrics rescue RuntimeError => e # skipping service log_exception e end end rescue RuntimeError => e # skipping service type log_exception e end end
log_exception(e)
click to toggle source
# File lib/scaltainer/runner.rb, line 74 def log_exception(e) @logger.log (e.class == Scaltainer::Warning ? Logger::WARN : Logger::ERROR), e.message end
process_service(service_name, config, state, namespace, type, metrics)
click to toggle source
# File lib/scaltainer/runner.rb, line 78 def process_service(service_name, config, state, namespace, type, metrics) service = get_service service_name, namespace @logger.debug "Found #{service.type} at orchestrator with name '#{service.name}' and id '#{service.id}'" current_replicas = service.get_replicas @logger.debug "#{service.type.capitalize} #{service.name} is currently configured for #{current_replicas} replica(s)" metric = metrics[service.name] raise Scaltainer::Warning.new("Configured #{service.type} '#{service.name}' not found in metrics endpoint") unless metric state["metric"] = metric state["service_type"] = type.to_s desired_replicas = type.determine_desired_replicas metric, config, current_replicas @logger.debug "Desired number of replicas for #{service.type} #{service.name} is #{desired_replicas}" adjusted_replicas = type.adjust_desired_replicas(desired_replicas, config) @logger.debug "Desired number of replicas for #{service.type} #{service.name} is adjusted to #{adjusted_replicas}" replica_diff = adjusted_replicas - current_replicas state["replicas"] = current_replicas type.yield_to_scale(replica_diff, config, state, metric, service.name, @logger) do scale_out service, current_replicas, adjusted_replicas state["replicas"] = adjusted_replicas end end
register_pushgateway(pushgateway)
click to toggle source
# File lib/scaltainer/runner.rb, line 125 def register_pushgateway(pushgateway) @registry = Prometheus::Client.registry @web_replicas_gauge = @registry.gauge(:scaltainer_web_replicas_total, docstring: 'Scaltainer controller replicas for web services', labels: [:controller, :namespace]) @worker_replicas_gauge = @registry.gauge(:scaltainer_worker_replicas_total, docstring: 'Scaltainer controller replicas for worker services', labels: [:controller, :namespace]) @web_metrics_gauge = @registry.gauge(:scaltainer_web_response_time_seconds, docstring: 'Scaltainer controller response time metric in seconds', labels: [:controller, :namespace]) @worker_metrics_gauge = @registry.gauge(:scaltainer_worker_queue_size_total, docstring: 'Scaltainer controller queue size metric', labels: [:controller, :namespace]) @ticks_counter = @registry.counter(:scaltainer_ticks_total, docstring: 'Scaltainer ticks', labels: [:namespace]) @pushgateway = Prometheus::Client::Push.new("scaltainer", Socket.gethostname, "http://#{pushgateway}") end
run(config, state, service_type_web, service_type_worker, namespace)
click to toggle source
# File lib/scaltainer/runner.rb, line 38 def run(config, state, service_type_web, service_type_worker, namespace) iterate_services config["web_services"], namespace, service_type_web, state iterate_services config["worker_services"], namespace, service_type_worker, state end
save_state(statefile, state)
click to toggle source
# File lib/scaltainer/runner.rb, line 47 def save_state(statefile, state) File.write(statefile, state.to_yaml) end
scale_out(service, current_replicas, desired_replicas)
click to toggle source
# File lib/scaltainer/runner.rb, line 114 def scale_out(service, current_replicas, desired_replicas) return if current_replicas == desired_replicas # send scale command to orchestrator @logger.info "Scaling #{service.type} #{service.name} from #{current_replicas} to #{desired_replicas}" begin service.set_replicas desired_replicas rescue => e raise NetworkError.new "Could not scale #{service.type} #{service.name} due to error: #{e.message}" end end
sync_pushgateway(namespace, state)
click to toggle source
# File lib/scaltainer/runner.rb, line 136 def sync_pushgateway(namespace, state) @logger.debug("Now syncing state #{state} in namespace #{namespace}") factor = 1 state.each do |service, state| if state["service_type"] == 'Web' replicas_gauge = @web_replicas_gauge metrics_gauge = @web_metrics_gauge factor = 0.001 else replicas_gauge = @worker_replicas_gauge metrics_gauge = @worker_metrics_gauge end replicas_gauge.set(state["replicas"], labels: {namespace: namespace, controller: service}) metrics_gauge.set(state["metric"] * factor, labels: {namespace: namespace, controller: service}) end @ticks_counter.increment(labels: {namespace: namespace}) begin @pushgateway.add(@registry) rescue => e @logger.warn "[#{e.class}] Error pushing metrics to the configured Prometheus Push Gateway: #{e.message}" end @logger.info "Pushed metrics successfully to the configured Prometheus Push Gateway" end