class Elasticsearch::Rails::HA::ParallelIndexer

Attributes

batch_size[R]
force[R]
idx_name[R]
klass[R]
max[R]
nprocs[R]
scope[R]
verbose[R]

Public Class Methods

new(opts) click to toggle source

leverage multiple cores to run indexing in parallel

# File lib/elasticsearch/rails/ha/parallel_indexer.rb, line 23
def initialize(opts)
  @klass    = opts[:klass] or fail "klass required"
  @idx_name = opts[:idx_name] or fail "idx_name required"
  @nprocs   = opts[:nprocs] or fail "nprocs required"
  @batch_size = opts[:batch_size] or fail "batch_size required"
  @max        = opts[:max]
  @force      = opts[:force]
  @verbose    = opts[:verbose]
  @scope      = opts[:scope]

  # make sure klass is not a simple string
  if @klass.is_a?(String)
    @klass = @klass.constantize
  end

  # calculate array of offsets based on nprocs
  @total_expected = klass.count
  @pool_size = (@total_expected / @nprocs.to_f).ceil
end

Public Instance Methods

blue_log(msg) click to toggle source
# File lib/elasticsearch/rails/ha/parallel_indexer.rb, line 14
def blue_log(msg)
  blue{ msg }
end
process_child_results(results) click to toggle source
# File lib/elasticsearch/rails/ha/parallel_indexer.rb, line 91
def process_child_results(results)
  # check exit status of each child so we know if we should throw exception
  results.each do |pair|
    pid = pair[0]
    pstat = pair[1]
    exit_ok = true
    if pstat.exited?
      @verbose and puts blue_log("PID #{pid} exited with #{pstat.exitstatus}")
    end
    if pstat.signaled?
      puts red_log(" >> #{pid} exited with uncaught signal #{pstat.termsig}")
      exit_ok = false
    end

    if !pstat.success?
      puts red_log(" >> #{pid} was not successful")
      exit_ok = false
    end

    if pair[1].exitstatus != 0
      puts red_log(" >> #{pid} exited with non-zero status")
      exit_ok = false
    end

    if !exit_ok
      raise red_log("PID #{pair[0]} exited abnormally, so the whole reindex fails")
    end
  end
end
red_log(msg) click to toggle source
# File lib/elasticsearch/rails/ha/parallel_indexer.rb, line 18
def red_log(msg)
  red{ msg }
end
run() click to toggle source
# File lib/elasticsearch/rails/ha/parallel_indexer.rb, line 43
def run
  return if @pool_size < 1

  # get all ids since we can't assume there are no holes in the PK sequencing
  ids = klass.order('id ASC').pluck(:id)
  offsets = []
  ids.each_slice(@pool_size) do |chunk|
    #puts "chunk: size=#{chunk.size} #{chunk.first}..#{chunk.last}"
    offsets.push( chunk.first )
  end
  if @verbose
    puts blue_log("Parallel Indexer: index=#{@idx_name} total=#{@total_expected} nprocs=#{@nprocs} pool_size=#{@pool_size} offsets=#{offsets} ")
  end

  if @force
    @verbose and puts blue_log("Force creating new index")
    klass.__elasticsearch__.create_index! force: true, index: idx_name
    klass.__elasticsearch__.refresh_index! index: idx_name
  end

  @current_db_config = ::ActiveRecord::Base.connection_config
  # IMPORTANT before forks in offsets loop
  ::ActiveRecord::Base.connection.disconnect!

  child_pids = []
  offsets.each do |start_at|
    child_pid = fork do
      run_child(start_at)
    end
    if child_pid
      child_pids << child_pid
    end
  end

  # reconnect in parent
  ::ActiveRecord::Base.establish_connection(@current_db_config)

  # Process.waitall seems to hang during tests. Do it manually.
  child_results = []

  child_pids.each do |pid|
    Process.wait(pid)
    child_results.push [pid, $?]
  end

  process_child_results(child_results)
end
run_child(start_at) click to toggle source
# File lib/elasticsearch/rails/ha/parallel_indexer.rb, line 121
def run_child(start_at)
  # IMPORTANT after fork
  ::ActiveRecord::Base.establish_connection(@current_db_config)

  # IMPORTANT for tests to determine whether at_end should run
  ENV["I_AM_HA_CHILD"] = "true"

  completed = 0
  errors    = []
  @verbose and puts blue_log("Start worker #{$$} at offset #{start_at}")
  pbar = ::ANSI::Progressbar.new("#{klass} [#{$$}]", @pool_size, STDOUT) rescue nil
  checkpoint = false
  if pbar
    win_width = pbar.__send__ :get_width
    title_width = (win_width / 4).to_i
    pbar.format("#{klass} [#{$$}]: %3d%% %s %s", :percentage, :bar, :stat)
    pbar.__send__ :show
    pbar.bar_mark = '='
  else
    checkpoint = true
  end

  @klass.__elasticsearch__.import return: 'errors',
    index: @idx_name,
    start: start_at,
    scope: @scope,
    batch_size: @batch_size    do |resp|
      # show errors immediately (rather than buffering them)
      errors += resp['items'].select { |k, v| k.values.first['error'] }
      completed += resp['items'].size
      if pbar && @verbose
        pbar.inc resp['items'].size
      end
      if checkpoint && @verbose
        puts blue_log("[#{$$}] #{Time.now.utc.iso8601} : #{completed} records completed")
      end
      STDERR.flush
      STDOUT.flush
      if errors.size > 0
        STDOUT.puts "ERRORS in #{$$}:"
        STDOUT.puts errors.pretty_inspect
      end
      if completed >= @pool_size || (@max && @max.to_i == completed)
        pbar.finish if pbar
        @verbose and puts blue_log("Worker #{$$} finished #{completed} records")
        exit!(true) # exit child worker
      end
    end # end do |resp| block
end