class RateThrottleClient::Demo
Constants
- DURATION
- MINUTE
- PROCESS_COUNT
- RACKUP_FILE
- THREAD_COUNT
- TIME_SCALE
Attributes
log_dir[R]
rackup_file[R]
Public Class Methods
new(client:,thread_count: THREAD_COUNT, process_count: PROCESS_COUNT, duration: DURATION, log_dir: nil, time_scale: TIME_SCALE, stream_requests: false, json_duration: 30, rackup_file: RACKUP_FILE, starting_limit: 0, remaining_stop_under: nil)
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 63 def initialize(client:,thread_count: THREAD_COUNT, process_count: PROCESS_COUNT, duration: DURATION, log_dir: nil, time_scale: TIME_SCALE, stream_requests: false, json_duration: 30, rackup_file: RACKUP_FILE, starting_limit: 0, remaining_stop_under: nil) @client = client @thread_count = thread_count @process_count = process_count @duration = duration @time_scale = time_scale.to_f @stream_requests = stream_requests @rackup_file = rackup_file @starting_limit = starting_limit @remaining_stop_under = remaining_stop_under if log_dir @log_dir = Pathname.new(log_dir) else @log_dir = Pathname.new(__dir__).join("../../logs/clients/#{Time.now.strftime('%Y-%m-%d-%H-%M-%s-%N')}-#{client.class}") end @mutex = Mutex.new @json_duration = 30 # seconds @port = UniquePort.call @threads = [] @pids = [] Timecop.scale(@time_scale) FileUtils.mkdir_p(@log_dir) end
Public Instance Methods
call()
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 130 def call WaitForIt.new("bundle exec puma #{@rackup_file.to_s} -p #{@port}", env: {"TIME_SCALE" => @time_scale.to_i.to_s, "STARTING_LIMIT" => @starting_limit.to_s}, wait_for: "Use Ctrl-C to stop") do |spawn| @process_count.times.each do boot_process end @pids.map { |pid| Process.wait(pid) } end end
chart(open_file)
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 106 def chart(open_file) chart = RateThrottleClient::Chart.new(log_dir: @log_dir, name: @client.class.to_s.gsub("RateThrottleClient::", ""), time_scale: @time_scale) chart.call(open_file) end
print_results(io = STDOUT)
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 90 def print_results(io = STDOUT) result_hash = self.results io.puts io.puts "### #{@client.class} results (duration: #{@duration/60.0} minutes, multiplier: #{@client.multiplier})" io.puts io.puts "```" io.puts "Avg retry rate: #{"%.2f" % (result_hash["retry_ratio"].mean * 100)} %" io.puts "Max sleep time: #{"%.2f" % result_hash["max_sleep_val"].max} seconds" io.puts "Stdev Request Count: #{"%.2f" % result_hash["request_count"].stdev}" io.puts result_hash.each do |key, value| io.puts "Raw #{key}s: [#{ value.map {|x| "%.2f" % x}.join(", ")}]" end io.puts "```" end
results()
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 111 def results result_hash = {} @log_dir.entries.map do |entry| @log_dir.join(entry) end.select do |file| file.file? && file.extname == ".json" end.sort.map do |file| JSON.parse(file.read) end.each do |json| json.each_key do |key| result_hash[key] ||= [] result_hash[key] << json[key] end end result_hash end
Private Instance Methods
boot_process()
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 140 def boot_process @pids << fork do run_threads end end
make_request()
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 239 def make_request req = Excon.get("http://localhost:#{@port}") raise "Got unexpected reponse #{req.status}. #{req.inspect}" if req.status != 200 && req.status != 429 req rescue Excon::Error::Timeout => e puts e.inspect puts "retrying" retry end
monkey_patch_client_sleep()
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 215 def monkey_patch_client_sleep @mutex.synchronize do if !@client.instance_variables.include?(:"@time_scale") def @client.sleep(val) @max_sleep_val = val if val > @max_sleep_val Thread.current.thread_variable_set("last_sleep_value", val) super val/@time_scale end def @client.max_sleep_val @max_sleep_val end def @client.last_sleep @last_sleep || 0 end end @client.instance_variable_set(:"@time_scale", @time_scale) @client.instance_variable_set(:"@max_sleep_val", 0) end end
run_client_single()
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 172 def run_client_single end_at_time = Time.now + @duration json_at_time = Time.now + @json_duration request_count = 0 retry_count = 0 monkey_patch_client_sleep loop do begin_time = Time.now break if begin_time > end_at_time if begin_time > json_at_time write_json_value(retry_count: retry_count, request_count: request_count, max_sleep_val: @client.max_sleep_val) json_at_time = begin_time + @json_duration end req = nil @client.call do request_count += 1 req = make_request retry_count += 1 if req.status == 429 stream_requests(req, retry_count: retry_count, request_count: request_count) if @stream_requests req end if @remaining_stop_under break if (req.headers["RateLimit-Remaining"].to_i <= @remaining_stop_under) end end stop_all_theads! rescue Excon::Error::Socket => e raise e rescue TimeIsUpError # Since the sleep time can be very high, we need a way to notify sleeping threads they can stop # When this exception is raised, do nothing and exit ensure write_json_value(retry_count: retry_count, request_count: request_count, max_sleep_val: @client.max_sleep_val) end
run_threads()
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 146 def run_threads @thread_count.times.each do @threads << Thread.new do run_client_single end end # Chart support, print out the sleep value in 1 second increments to a file Thread.new do loop do @threads.each do |thread| sleep_for = thread.thread_variable_get("last_sleep_value") || 0 File.open(@log_dir.join("#{Process.pid}:#{thread.object_id}-chart-data.txt"), 'a') do |f| f.puts(sleep_for) end end sleep 1 # time gets adjusted via TIME_SCALE later in time.rb end end @threads.map(&:join) end
stop_all_theads!()
click to toggle source
Even though all clients might have reached their `end_time` they might be stuck in a long `sleep`. This method signals to any threads that might be stuck in a `sleep` to stop via an exception that we raise and catch
# File lib/rate_throttle_client/demo.rb, line 265 def stop_all_theads! @threads.each do |t| next if @remaining_stop_under if t != Thread.current && t.backtrace_locations && t.backtrace_locations.first.label == "sleep" t.raise(TimeIsUpError) end end end
stream_requests(request, retry_count:, request_count:)
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 250 def stream_requests(request, retry_count:, request_count:) status_string = String.new status_string << "#{Process.pid}##{Thread.current.object_id}: " status_string << "status=#{request.status} " status_string << "remaining=#{request.headers["RateLimit-Remaining"]} " status_string << "retry_count=#{retry_count} " status_string << "request_count=#{request_count} " status_string << "max_sleep_val=#{ sprintf("%.2f", @client.max_sleep_val) } " puts status_string end
write_json_value(retry_count:, request_count:, max_sleep_val:)
click to toggle source
# File lib/rate_throttle_client/demo.rb, line 275 def write_json_value(retry_count:, request_count:, max_sleep_val:) results = { max_sleep_val: max_sleep_val, retry_ratio: retry_count / request_count.to_f, request_count: request_count } File.open(@log_dir.join("#{Process.pid}:#{Thread.current.object_id}.json"), 'w+') do |f| f.puts(results.to_json) end rescue TimeIsUpError retry end