class Object

Constants

LIMIT_ROWS
TABLE_NAME

Public Instance Methods

benchmark(repeat=100) click to toggle source
# File benchmarks/em_pg.rb, line 9
def benchmark(repeat=100)
  Benchmark.bm(20) do |b|
    b.report('single:')         { single(repeat) }
    puts
    b.report('parallel 90000/1:')  { parallel(repeat, 90000, 1) }
    b.report('parallel 5000/5:')  { parallel(repeat, 5000, 5) }
    b.report('parallel 2000/10:') { parallel(repeat, 2000, 10) }
    b.report('parallel 1000/20:') { parallel(repeat, 1000, 20) }
    puts
    patch_blocking
    b.report('blocking 90000/1:')  { parallel(repeat, 90000, 1) }
    b.report('blocking 5000/5:')  { parallel(repeat, 5000, 5) }
    b.report('blocking 2000/10:') { parallel(repeat, 2000, 10) }
    b.report('blocking 1000/20:') { parallel(repeat, 1000, 20) }
    patch_remove_blocking
  end
end
fibers(repeat, concurrency) click to toggle source
# File benchmarks/single_row_mode.rb, line 46
def fibers(repeat, concurrency)
  EM.synchrony do
    db = PG::EM::ConnectionPool.new size: concurrency, lazy: true
    FiberIterator.new((0...concurrency), concurrency).each do
      db.hold do |pg|
        (repeat/concurrency).times do
          stream_results(pg)
        end
      end
    end
    db.finish
    EM.stop
  end
end
parallel(repeat=1, chunk_size=2000, concurrency=10) click to toggle source

retrieve resources using parallel queries

# File benchmarks/em_pg.rb, line 71
def parallel(repeat=1, chunk_size=2000, concurrency=10)
  resources = []
  rowcount = 0
  EM.synchrony do
    p = PG::EM::ConnectionPool.new size: concurrency
    p.query('select count(*) from resources') do |result|
      rowcount = result.getvalue(0,0).to_i
    end
    offsets = (rowcount / chunk_size.to_f).ceil.times.map {|n| n*chunk_size }
    repeat.times do
      EM::Synchrony::FiberIterator.new(offsets, concurrency).each do |offset|
        p.query('select * from resources order by cdate limit $1 offset $2', [chunk_size, offset]) do |result|
          resources[offset, chunk_size] = result.values
        end
      end
    end
    EM.stop
  end
  # raise "invalid count #{resources.length} != #{rowcount}" if resources.length != rowcount
  # raise "resources != $resources" if resources != $resources
  resources
end
patch_blocking() click to toggle source
# File benchmarks/em_pg.rb, line 35
def patch_blocking
  PG::Connection.class_eval <<-EOE
    alias_method :blocking_get_last_result, :get_last_result
  EOE
  PG::EM::Client::Watcher.module_eval <<-EOE
    alias_method :original_fetch_results, :fetch_results
    def fetch_results
      self.notify_readable = false
      begin
        result = @client.blocking_get_last_result
      rescue Exception => e
        @deferrable.fail(e)
      else
        @deferrable.succeed(result)
      end
    end
    alias_method :notify_readable, :fetch_results
  EOE
end
patch_remove_blocking() click to toggle source
# File benchmarks/em_pg.rb, line 27
def patch_remove_blocking
  PG::EM::Client::Watcher.module_eval <<-EOE
    alias_method :fetch_results, :original_fetch_results
    alias_method :notify_readable, :fetch_results
    undef :original_fetch_results
  EOE
end
single(repeat=1) click to toggle source

retrieve resources using single select query

# File benchmarks/em_pg.rb, line 56
def single(repeat=1)
  rowcount = 0
  p = PGconn.new
  p.query('select count(*) from resources') do |result|
    rowcount = result.getvalue(0,0).to_i
  end
  repeat.times do
    p.query('select * from resources order by cdate') do |result|
      $resources = result.values
    end
  end
  # raise "invalid count #{$resources.length} != #{rowcount}" if $resources.length != rowcount
end
stream_results(pg) click to toggle source
# File benchmarks/single_row_mode.rb, line 61
def stream_results(pg)
  pg.send_query("select * from #{TABLE_NAME}")
  pg.set_single_row_mode
  rows = 0
  last_time = Time.now
  while result = pg.get_result
    begin
      result.check
      result.each do |tuple|
        rows += 1
        if rows >= LIMIT_ROWS
          pg.reset
          break
        end
      end
    rescue PG::Error => e
      pg.get_last_result
      raise e
    ensure
      result.clear
    end
  end
end
threads(repeat, concurrency) click to toggle source
# File benchmarks/single_row_mode.rb, line 34
def threads(repeat, concurrency)
  db = Hash.new { |pool, id| pool[id] = PG::Connection.new }
  (0...concurrency).map do |i|
    Thread.new do
      (repeat/concurrency).times do
        stream_results(db[i])
      end
    end
  end.each(&:join)
  db.each_value(&:finish).clear
end