class RateThrottleClient::Demo

Constants

DURATION
MINUTE
PROCESS_COUNT
RACKUP_FILE
THREAD_COUNT
TIME_SCALE

Attributes

log_dir[R]
rackup_file[R]

Public Class Methods

new(client:,thread_count: THREAD_COUNT, process_count: PROCESS_COUNT, duration: DURATION, log_dir: nil, time_scale: TIME_SCALE, stream_requests: false, json_duration: 30, rackup_file: RACKUP_FILE, starting_limit: 0, remaining_stop_under: nil) click to toggle source
# File lib/rate_throttle_client/demo.rb, line 63
def initialize(client:,thread_count: THREAD_COUNT, process_count: PROCESS_COUNT, duration: DURATION, log_dir: nil, time_scale: TIME_SCALE, stream_requests: false, json_duration: 30, rackup_file: RACKUP_FILE, starting_limit: 0, remaining_stop_under: nil)
  @client = client
  @thread_count = thread_count
  @process_count = process_count
  @duration = duration
  @time_scale = time_scale.to_f
  @stream_requests = stream_requests
  @rackup_file = rackup_file
  @starting_limit = starting_limit
  @remaining_stop_under = remaining_stop_under

  if log_dir
    @log_dir = Pathname.new(log_dir)
  else
    @log_dir = Pathname.new(__dir__).join("../../logs/clients/#{Time.now.strftime('%Y-%m-%d-%H-%M-%s-%N')}-#{client.class}")
  end

  @mutex = Mutex.new
  @json_duration = 30 # seconds
  @port = UniquePort.call
  @threads = []
  @pids = []
  Timecop.scale(@time_scale)

  FileUtils.mkdir_p(@log_dir)
end

Public Instance Methods

call() click to toggle source
# File lib/rate_throttle_client/demo.rb, line 130
def call
  WaitForIt.new("bundle exec puma #{@rackup_file.to_s} -p #{@port}", env: {"TIME_SCALE" => @time_scale.to_i.to_s, "STARTING_LIMIT" => @starting_limit.to_s}, wait_for: "Use Ctrl-C to stop") do |spawn|
    @process_count.times.each do
      boot_process
    end

    @pids.map { |pid| Process.wait(pid) }
  end
end
chart(open_file) click to toggle source
# File lib/rate_throttle_client/demo.rb, line 106
def chart(open_file)
  chart = RateThrottleClient::Chart.new(log_dir: @log_dir, name: @client.class.to_s.gsub("RateThrottleClient::", ""), time_scale: @time_scale)
  chart.call(open_file)
end
print_results(io = STDOUT) click to toggle source
results() click to toggle source
# File lib/rate_throttle_client/demo.rb, line 111
def results
  result_hash = {}

  @log_dir.entries.map do |entry|
    @log_dir.join(entry)
  end.select do |file|
    file.file? && file.extname == ".json"
  end.sort.map do |file|
    JSON.parse(file.read)
  end.each do |json|
    json.each_key do |key|
      result_hash[key] ||= []
      result_hash[key] << json[key]
    end
  end

  result_hash
end

Private Instance Methods

boot_process() click to toggle source
# File lib/rate_throttle_client/demo.rb, line 140
        def boot_process
  @pids << fork do
    run_threads
  end
end
make_request() click to toggle source
# File lib/rate_throttle_client/demo.rb, line 239
        def make_request
  req = Excon.get("http://localhost:#{@port}")

  raise "Got unexpected reponse #{req.status}. #{req.inspect}" if req.status != 200 &&  req.status != 429
  req
rescue Excon::Error::Timeout => e
  puts e.inspect
  puts "retrying"
  retry
end
monkey_patch_client_sleep() click to toggle source
# File lib/rate_throttle_client/demo.rb, line 215
        def monkey_patch_client_sleep
  @mutex.synchronize do
    if !@client.instance_variables.include?(:"@time_scale")
      def @client.sleep(val)
        @max_sleep_val = val if val > @max_sleep_val
        Thread.current.thread_variable_set("last_sleep_value", val)

        super val/@time_scale
      end

      def @client.max_sleep_val
        @max_sleep_val
      end

      def @client.last_sleep
        @last_sleep || 0
      end
    end

    @client.instance_variable_set(:"@time_scale", @time_scale)
    @client.instance_variable_set(:"@max_sleep_val", 0)
  end
end
run_client_single() click to toggle source
# File lib/rate_throttle_client/demo.rb, line 172
        def run_client_single
  end_at_time = Time.now + @duration
  json_at_time = Time.now + @json_duration
  request_count = 0
  retry_count = 0

  monkey_patch_client_sleep
  loop do
    begin_time = Time.now
    break if begin_time > end_at_time

    if begin_time > json_at_time
      write_json_value(retry_count: retry_count, request_count: request_count, max_sleep_val: @client.max_sleep_val)
      json_at_time = begin_time + @json_duration
    end

    req = nil
    @client.call do
      request_count += 1

      req = make_request

      retry_count += 1 if req.status == 429

      stream_requests(req, retry_count: retry_count, request_count: request_count) if @stream_requests
      req
    end

    if @remaining_stop_under
      break if (req.headers["RateLimit-Remaining"].to_i <= @remaining_stop_under)
    end
  end
  stop_all_theads!

rescue Excon::Error::Socket => e
  raise e
rescue TimeIsUpError
  # Since the sleep time can be very high, we need a way to notify sleeping threads they can stop
  # When this exception is raised, do nothing and exit
ensure
  write_json_value(retry_count: retry_count, request_count: request_count, max_sleep_val: @client.max_sleep_val)
end
run_threads() click to toggle source
# File lib/rate_throttle_client/demo.rb, line 146
        def run_threads
  @thread_count.times.each do
    @threads << Thread.new do
      run_client_single
    end
  end

  # Chart support, print out the sleep value in 1 second increments to a file
  Thread.new do
    loop do
      @threads.each do |thread|
        sleep_for = thread.thread_variable_get("last_sleep_value") || 0

        File.open(@log_dir.join("#{Process.pid}:#{thread.object_id}-chart-data.txt"), 'a') do |f|
          f.puts(sleep_for)
        end
      end
      sleep 1 # time gets adjusted via TIME_SCALE later in time.rb
    end
  end

  @threads.map(&:join)
end
stop_all_theads!() click to toggle source

Even though all clients might have reached their `end_time` they might be stuck in a long `sleep`. This method signals to any threads that might be stuck in a `sleep` to stop via an exception that we raise and catch

# File lib/rate_throttle_client/demo.rb, line 265
        def stop_all_theads!
  @threads.each do |t|
    next if @remaining_stop_under

    if t != Thread.current && t.backtrace_locations && t.backtrace_locations.first.label == "sleep"
      t.raise(TimeIsUpError)
    end
  end
end
stream_requests(request, retry_count:, request_count:) click to toggle source
# File lib/rate_throttle_client/demo.rb, line 250
        def stream_requests(request, retry_count:, request_count:)
  status_string = String.new
  status_string << "#{Process.pid}##{Thread.current.object_id}: "
  status_string << "status=#{request.status} "
  status_string << "remaining=#{request.headers["RateLimit-Remaining"]} "
  status_string << "retry_count=#{retry_count} "
  status_string << "request_count=#{request_count} "
  status_string << "max_sleep_val=#{ sprintf("%.2f", @client.max_sleep_val) } "

  puts status_string
end
write_json_value(retry_count:, request_count:, max_sleep_val:) click to toggle source
# File lib/rate_throttle_client/demo.rb, line 275
        def write_json_value(retry_count:, request_count:, max_sleep_val:)
  results = {
    max_sleep_val: max_sleep_val,
    retry_ratio: retry_count / request_count.to_f,
    request_count: request_count
  }

  File.open(@log_dir.join("#{Process.pid}:#{Thread.current.object_id}.json"), 'w+') do |f|
    f.puts(results.to_json)
  end
rescue TimeIsUpError
  retry
end