class ESReindex

Constants

DEFAULT_URL
VERSION

Attributes

logger[RW]
dclient[RW]
didx[RW]
done[RW]
dst[RW]
durl[RW]
mappings[RW]
options[RW]
sclient[RW]
settings[RW]
sidx[RW]
src[RW]
start_time[RW]
surl[RW]

Public Class Methods

copy!(src, dst, options = {}) click to toggle source
# File lib/es-reindex.rb, line 12
def self.copy!(src, dst, options = {})
  self.new(src, dst, options).tap do |reindexer|
    reindexer.setup_index_urls
    reindexer.copy! if reindexer.okay_to_proceed?
  end
end
new(src, dst, options = {}) click to toggle source
# File lib/es-reindex.rb, line 26
def initialize(src, dst, options = {})
  ESReindex.logger ||= Logger.new(STDERR)

  @src     = src || ''
  @dst     = dst || ''
  @options = {
    from_cli: false, # Coming from CLI?
    remove: false, # remove the index in the new location first
    update: false, # update existing documents (default: only create non-existing)
    frame:  1000,  # specify frame size to be obtained with one fetch during scrolling
    copy_mappings: true # Copy old mappings/settings
  }.merge! options

  %w{
    if unless mappings settings before_create after_create before_each after_each after_copy
  }.each do |callback|
    if options.has_key?(callback.to_sym) && !options[callback.to_sym].respond_to?(:call)
      raise ArgumentError, "#{callback} must be a callable object"
    end
  end

  @done = 0
end
reindex!(src, dst, options={}) click to toggle source
# File lib/es-reindex.rb, line 19
def self.reindex!(src, dst, options={})
  self.new(src, dst, options.merge(copy_mappings: false)).tap do |reindexer|
    reindexer.setup_index_urls
    reindexer.copy! if reindexer.okay_to_proceed?
  end
end

Public Instance Methods

check_docs() click to toggle source
# File lib/es-reindex.rb, line 183
def check_docs
  log 'Checking document count... '
  scount, dcount = 1, 0
  begin
    Timeout::timeout(60) do
      while true
        scount = sclient.count(index: sidx)["count"]
        dcount = dclient.count(index: didx)["count"]
        break if scount == dcount
        sleep 1
      end
    end
  rescue Timeout::Error
  end
  log "Document count: #{scount} = #{dcount} (#{scount == dcount ? 'equal' : 'NOT EQUAL'})"

  scount == dcount
end
clear_destination() click to toggle source
# File lib/es-reindex.rb, line 97
def clear_destination
  dclient.indices.delete(index: didx) if remove? && dclient.indices.exists(index: didx)
  true
rescue => e
  false
end
confirm() click to toggle source
# File lib/es-reindex.rb, line 92
def confirm
  printf "Confirm or hit Ctrl-c to abort...\n"
  $stdin.readline
end
copy!() click to toggle source
# File lib/es-reindex.rb, line 74
def copy!
  log "Copying '#{surl}/#{sidx}' to '#{durl}/#{didx}'#{remove? ? ' with rewriting destination mapping!' : update? ? ' with updating existing documents!' : '.'}"
  confirm if from_cli?

  success = (
    clear_destination  &&
    create_destination &&
    copy_docs          &&
    check_docs
  )

  if from_cli?
    exit (success ? 0 : 1)
  else
    success
  end
end
copy_docs() click to toggle source
# File lib/es-reindex.rb, line 146
def copy_docs
  log "Copying '#{surl}/#{sidx}' to '#{durl}/#{didx}'..."
  @start_time = Time.now

  scroll = sclient.search index: sidx, search_type: "scan", scroll: '10m', size: frame
  scroll_id = scroll['_scroll_id']
  total = scroll['hits']['total']
  log "Copy progress: %u/%u (%.1f%%) done.\r" % [done, total, 0]

  action = update? ? 'index' : 'create'

  while scroll = sclient.scroll(scroll_id: scroll['_scroll_id'], scroll: '10m') and not scroll['hits']['hits'].empty? do
    bulk = []
    scroll['hits']['hits'].each do |doc|
      options[:before_each] && options[:before_each].call
      ### === implement possible modifications to the document
      ### === end modifications to the document
      base = {'_index' => didx, '_id' => doc['_id'], '_type' => doc['_type'], data: doc['_source']}
      bulk << {action => base}
      @done = done + 1
      options[:after_each] && options[:after_each].call
    end
    unless bulk.empty?
      dclient.bulk body: bulk
    end

    eta = total * (Time.now - start_time) / done
    log "Copy progress: #{done}/#{total} (%.1f%%) done in #{tm_len}. E.T.A.: #{start_time + eta}." % (100.0 * done / total)
  end

  log "Copy progress: %u/%u done in %s.\n" % [done, total, tm_len]

  options[:after_copy] && options[:after_copy].call

  true
end
create_destination() click to toggle source
# File lib/es-reindex.rb, line 104
def create_destination
  unless dclient.indices.exists index: didx
    if copy_mappings?
      return false unless get_settings
      return false unless get_mappings
      create_msg = " with settings & mappings from '#{surl}/#{sidx}'"
    else
      @mappings = options[:mappings].nil? ? {} : options[:mappings].call
      @settings = options[:settings].nil? ? {} : options[:settings].call
      create_msg = ""
    end

    options[:before_create] && options[:before_create].call

    log "Creating '#{durl}/#{didx}' index#{create_msg}..."
    dclient.indices.create index: didx, body: { settings: settings, mappings: mappings }
    log "Succesfully created '#{durl}/#{didx}''#{create_msg}."

    options[:after_create] && options[:after_create].call
  end

  true
end
get_mappings() click to toggle source
# File lib/es-reindex.rb, line 138
def get_mappings
  unless mappings = sclient.indices.get_mapping(index: sidx)
    log "Failed to obtain original index '#{surl}/#{sidx}' mappings!", :error
    return false
  end
  @mappings = fetch_index_config(mappings, sidx)["mappings"]
end
get_settings() click to toggle source
# File lib/es-reindex.rb, line 128
def get_settings
  unless settings = sclient.indices.get_settings(index: sidx)
    log "Failed to obtain original index '#{surl}/#{sidx}' settings!"
    return false
  end

  @settings = fetch_index_config(settings, sidx)["settings"]
  @settings["index"]["version"].delete "created"
end
okay_to_proceed?() click to toggle source
# File lib/es-reindex.rb, line 66
def okay_to_proceed?
  okay = true
  okay = options[:if].call(sclient, dclient) if options.has_key?(:if)
  okay = (okay && !(options[:unless].call sclient, dclient)) if options.has_key?(:unless)
  log 'Skipping action due to guard callbacks' unless okay
  okay
end
setup_index_urls() click to toggle source
# File lib/es-reindex.rb, line 50
def setup_index_urls
  @surl, @durl, @sidx, @didx = '', '', '', ''
  [[src, surl, sidx], [dst, durl, didx]].each do |param, url, idx|
    if param =~ %r{^(.*)/(.*?)$}
      url.replace $1
      idx.replace $2
    else
      url.replace DEFAULT_URL
      idx.replace param
    end
  end

  @sclient = Elasticsearch::Client.new host: surl
  @dclient = Elasticsearch::Client.new host: durl
end

Private Instance Methods

copy_mappings?() click to toggle source
# File lib/es-reindex.rb, line 228
def copy_mappings?
  @options[:copy_mappings]
end
fetch_index_config(config, index) click to toggle source

Accounts for aliased indices. When index is aliased there will not be a key nor in settings, nor in mappings. But there will be the only key for original index name.

# File lib/es-reindex.rb, line 248
def fetch_index_config(config, index)
  config.fetch(index) { config.values.first }
end
frame() click to toggle source
# File lib/es-reindex.rb, line 220
def frame
  @options[:frame]
end
from_cli?() click to toggle source
# File lib/es-reindex.rb, line 224
def from_cli?
  @options[:from_cli]
end
log(msg, level = :info) click to toggle source
# File lib/es-reindex.rb, line 208
def log(msg, level = :info)
  ESReindex.logger.send level, msg
end
remove?() click to toggle source
# File lib/es-reindex.rb, line 212
def remove?
  @options[:remove]
end
tm_len() click to toggle source
# File lib/es-reindex.rb, line 232
def tm_len
  l = Time.now - @start_time
  t = []
  t.push l/86400; l %= 86400
  t.push l/3600;  l %= 3600
  t.push l/60;    l %= 60
  t.push l
  out = sprintf '%u', t.shift
  out = out == '0' ? '' : out + ' days, '
  out << sprintf('%u:%02u:%02u', *t)
  out
end
update?() click to toggle source
# File lib/es-reindex.rb, line 216
def update?
  @options[:update]
end