class Elasticsearch::IndexStager

Constants

VERSION

Attributes

es_client[R]
index_name[R]

Public Class Methods

new(opts) click to toggle source
# File lib/elasticsearch/index_stager.rb, line 9
def initialize(opts)
  @index_name = opts[:index_name] or fail ":index_name required"
  @es_client = opts[:es_client] or fail ":es_client required"
end

Public Instance Methods

alias_stage_to_tmp_index() click to toggle source
# File lib/elasticsearch/index_stager.rb, line 23
def alias_stage_to_tmp_index
  es_client.indices.delete index: stage_index_name rescue false
  es_client.indices.update_aliases body: {
    actions: [
      { add: { index: tmp_index_name, alias: stage_index_name } } 
    ]   
  }   
end
promote(live_index_name=index_name) click to toggle source
# File lib/elasticsearch/index_stager.rb, line 32
def promote(live_index_name=index_name)
  @live_index_name = live_index_name || index_name

  # the renaming actions (performed atomically by ES)
  rename_actions = [ 
    { remove: { index: stage_aliased_to, alias: stage_index_name } },
    {    add: { index: stage_aliased_to, alias: @live_index_name } } 
  ]   

  # zap any existing index known as index_name,
  # but do it conditionally since it is reasonable that it does not exist.
  to_delete = []
  live_index_exists = false
  begin
    existing_live_index = es_client.indices.get_alias(index: @live_index_name, name: '*')
    live_index_exists = true
  rescue Elasticsearch::Transport::Transport::Errors::NotFound => _err
    existing_live_index = {}
  rescue => _err
    raise _err
  end
  existing_live_index.each do |k,v|

    # if the index is merely aliased, remove its alias as part of the aliasing transaction.
    if k != @live_index_name
      rename_actions.unshift({ remove: { index: k, alias: @live_index_name } })

      # mark it for deletion when we've successfully updated aliases
      to_delete.push k

    else
      raise "Found existing index called #{@live_index_name} aliased to itself"
    end
  end

  rename_live_index if live_index_exists

  # re-alias
  es_client.indices.update_aliases body: { actions: rename_actions }

  # clean up
  to_delete.each do |idxname|
    es_client.indices.delete index: idxname rescue false
  end
end
stage_index_name() click to toggle source
# File lib/elasticsearch/index_stager.rb, line 14
def stage_index_name
  index_name + "_staged"
end
tmp_index_name() click to toggle source
# File lib/elasticsearch/index_stager.rb, line 18
def tmp_index_name
  @_suffix ||= Time.now.strftime('%Y%m%d%H%M%S') + '-' + SecureRandom.hex[0..7]
  "#{index_name}_#{@_suffix}"
end

Private Instance Methods

find_newest_alias_for(the_index_name) click to toggle source
# File lib/elasticsearch/index_stager.rb, line 123
def find_newest_alias_for(the_index_name)
  aliased_to = nil
  aliases = es_client.indices.get_alias(index: the_index_name, name: '*')
  aliases.each do |k,v|
    next unless k.match(tmp_index_pattern)
    aliased_to ||= k
    alias_tstamp = aliased_to.match(tmp_index_pattern)[1]
    k_tstamp = k.match(tmp_index_pattern)[1]
    if Time.parse(alias_tstamp) < Time.parse(k_tstamp)
      aliased_to = k
    end
  end
  if !aliased_to
    raise "Cannot identify index aliased to by '#{the_index_name}'"
  end
  aliased_to
end
rename_live_index() click to toggle source
# File lib/elasticsearch/index_stager.rb, line 80
def rename_live_index
  # if the live index exists but is empty, it fill fail to reindex (rename)
  # so catch that case and just delete it.
  es_client.indices.refresh index: @live_index_name rescue false
  idx_stats = es_client.indices.stats index: @live_index_name, docs: true
  num_docs = idx_stats['indices'][@live_index_name]['total']['docs']['count']
  if num_docs.to_i == 0
    es_client.indices.delete index: @live_index_name
    return
  end

  new_name = @live_index_name + '-pre-staged-original'
  renamed = { source: { index: @live_index_name }, dest: { index: new_name } }

  # make a copy
  es_client.reindex refresh: true, wait_for_completion: true, body: renamed

  # make sure the copy exists before we delete the original
  tries = 0
  rename_ok = false
  while( tries < 10 ) do
    es_client.indices.refresh index: new_name rescue false
    indices = es_client.indices.get_aliases.keys
    break if rename_ok = indices.include?(new_name)
    tries += 1
    sleep(1)
  end
  raise "Failed to rename #{@live_index_name} -> #{new_name}" unless rename_ok

  # delete the original
  es_client.indices.delete index: @live_index_name
end
stage_aliased_to() click to toggle source
# File lib/elasticsearch/index_stager.rb, line 117
def stage_aliased_to
  # find the newest tmp index to which staged is aliased.
  # we need this because we want to re-alias it.
  aliased_to = find_newest_alias_for(stage_index_name)
end
tmp_index_pattern() click to toggle source
# File lib/elasticsearch/index_stager.rb, line 113
def tmp_index_pattern
  /#{index_name}_(\d{14})-\w{8}$/
end