class Object
Constants
- LIMIT_ROWS
- TABLE_NAME
Public Instance Methods
benchmark(repeat=100)
click to toggle source
# File benchmarks/em_pg.rb, line 9 def benchmark(repeat=100) Benchmark.bm(20) do |b| b.report('single:') { single(repeat) } puts b.report('parallel 90000/1:') { parallel(repeat, 90000, 1) } b.report('parallel 5000/5:') { parallel(repeat, 5000, 5) } b.report('parallel 2000/10:') { parallel(repeat, 2000, 10) } b.report('parallel 1000/20:') { parallel(repeat, 1000, 20) } puts patch_blocking b.report('blocking 90000/1:') { parallel(repeat, 90000, 1) } b.report('blocking 5000/5:') { parallel(repeat, 5000, 5) } b.report('blocking 2000/10:') { parallel(repeat, 2000, 10) } b.report('blocking 1000/20:') { parallel(repeat, 1000, 20) } patch_remove_blocking end end
fibers(repeat, concurrency)
click to toggle source
# File benchmarks/single_row_mode.rb, line 46 def fibers(repeat, concurrency) EM.synchrony do db = PG::EM::ConnectionPool.new size: concurrency, lazy: true FiberIterator.new((0...concurrency), concurrency).each do db.hold do |pg| (repeat/concurrency).times do stream_results(pg) end end end db.finish EM.stop end end
parallel(repeat=1, chunk_size=2000, concurrency=10)
click to toggle source
retrieve resources using parallel queries
# File benchmarks/em_pg.rb, line 71 def parallel(repeat=1, chunk_size=2000, concurrency=10) resources = [] rowcount = 0 EM.synchrony do p = PG::EM::ConnectionPool.new size: concurrency p.query('select count(*) from resources') do |result| rowcount = result.getvalue(0,0).to_i end offsets = (rowcount / chunk_size.to_f).ceil.times.map {|n| n*chunk_size } repeat.times do EM::Synchrony::FiberIterator.new(offsets, concurrency).each do |offset| p.query('select * from resources order by cdate limit $1 offset $2', [chunk_size, offset]) do |result| resources[offset, chunk_size] = result.values end end end EM.stop end # raise "invalid count #{resources.length} != #{rowcount}" if resources.length != rowcount # raise "resources != $resources" if resources != $resources resources end
patch_blocking()
click to toggle source
# File benchmarks/em_pg.rb, line 35 def patch_blocking PG::Connection.class_eval <<-EOE alias_method :blocking_get_last_result, :get_last_result EOE PG::EM::Client::Watcher.module_eval <<-EOE alias_method :original_fetch_results, :fetch_results def fetch_results self.notify_readable = false begin result = @client.blocking_get_last_result rescue Exception => e @deferrable.fail(e) else @deferrable.succeed(result) end end alias_method :notify_readable, :fetch_results EOE end
patch_remove_blocking()
click to toggle source
# File benchmarks/em_pg.rb, line 27 def patch_remove_blocking PG::EM::Client::Watcher.module_eval <<-EOE alias_method :fetch_results, :original_fetch_results alias_method :notify_readable, :fetch_results undef :original_fetch_results EOE end
single(repeat=1)
click to toggle source
retrieve resources using single select query
# File benchmarks/em_pg.rb, line 56 def single(repeat=1) rowcount = 0 p = PGconn.new p.query('select count(*) from resources') do |result| rowcount = result.getvalue(0,0).to_i end repeat.times do p.query('select * from resources order by cdate') do |result| $resources = result.values end end # raise "invalid count #{$resources.length} != #{rowcount}" if $resources.length != rowcount end
stream_results(pg)
click to toggle source
# File benchmarks/single_row_mode.rb, line 61 def stream_results(pg) pg.send_query("select * from #{TABLE_NAME}") pg.set_single_row_mode rows = 0 last_time = Time.now while result = pg.get_result begin result.check result.each do |tuple| rows += 1 if rows >= LIMIT_ROWS pg.reset break end end rescue PG::Error => e pg.get_last_result raise e ensure result.clear end end end
threads(repeat, concurrency)
click to toggle source
# File benchmarks/single_row_mode.rb, line 34 def threads(repeat, concurrency) db = Hash.new { |pool, id| pool[id] = PG::Connection.new } (0...concurrency).map do |i| Thread.new do (repeat/concurrency).times do stream_results(db[i]) end end end.each(&:join) db.each_value(&:finish).clear end