class Elasticsearch::IndexStager
Constants
- VERSION
Attributes
es_client[R]
index_name[R]
Public Class Methods
new(opts)
click to toggle source
# File lib/elasticsearch/index_stager.rb, line 9 def initialize(opts) @index_name = opts[:index_name] or fail ":index_name required" @es_client = opts[:es_client] or fail ":es_client required" end
Public Instance Methods
alias_stage_to_tmp_index()
click to toggle source
# File lib/elasticsearch/index_stager.rb, line 23 def alias_stage_to_tmp_index es_client.indices.delete index: stage_index_name rescue false es_client.indices.update_aliases body: { actions: [ { add: { index: tmp_index_name, alias: stage_index_name } } ] } end
promote(live_index_name=index_name)
click to toggle source
# File lib/elasticsearch/index_stager.rb, line 32 def promote(live_index_name=index_name) @live_index_name = live_index_name || index_name # the renaming actions (performed atomically by ES) rename_actions = [ { remove: { index: stage_aliased_to, alias: stage_index_name } }, { add: { index: stage_aliased_to, alias: @live_index_name } } ] # zap any existing index known as index_name, # but do it conditionally since it is reasonable that it does not exist. to_delete = [] live_index_exists = false begin existing_live_index = es_client.indices.get_alias(index: @live_index_name, name: '*') live_index_exists = true rescue Elasticsearch::Transport::Transport::Errors::NotFound => _err existing_live_index = {} rescue => _err raise _err end existing_live_index.each do |k,v| # if the index is merely aliased, remove its alias as part of the aliasing transaction. if k != @live_index_name rename_actions.unshift({ remove: { index: k, alias: @live_index_name } }) # mark it for deletion when we've successfully updated aliases to_delete.push k else raise "Found existing index called #{@live_index_name} aliased to itself" end end rename_live_index if live_index_exists # re-alias es_client.indices.update_aliases body: { actions: rename_actions } # clean up to_delete.each do |idxname| es_client.indices.delete index: idxname rescue false end end
stage_index_name()
click to toggle source
# File lib/elasticsearch/index_stager.rb, line 14 def stage_index_name index_name + "_staged" end
tmp_index_name()
click to toggle source
# File lib/elasticsearch/index_stager.rb, line 18 def tmp_index_name @_suffix ||= Time.now.strftime('%Y%m%d%H%M%S') + '-' + SecureRandom.hex[0..7] "#{index_name}_#{@_suffix}" end
Private Instance Methods
find_newest_alias_for(the_index_name)
click to toggle source
# File lib/elasticsearch/index_stager.rb, line 123 def find_newest_alias_for(the_index_name) aliased_to = nil aliases = es_client.indices.get_alias(index: the_index_name, name: '*') aliases.each do |k,v| next unless k.match(tmp_index_pattern) aliased_to ||= k alias_tstamp = aliased_to.match(tmp_index_pattern)[1] k_tstamp = k.match(tmp_index_pattern)[1] if Time.parse(alias_tstamp) < Time.parse(k_tstamp) aliased_to = k end end if !aliased_to raise "Cannot identify index aliased to by '#{the_index_name}'" end aliased_to end
rename_live_index()
click to toggle source
# File lib/elasticsearch/index_stager.rb, line 80 def rename_live_index # if the live index exists but is empty, it fill fail to reindex (rename) # so catch that case and just delete it. es_client.indices.refresh index: @live_index_name rescue false idx_stats = es_client.indices.stats index: @live_index_name, docs: true num_docs = idx_stats['indices'][@live_index_name]['total']['docs']['count'] if num_docs.to_i == 0 es_client.indices.delete index: @live_index_name return end new_name = @live_index_name + '-pre-staged-original' renamed = { source: { index: @live_index_name }, dest: { index: new_name } } # make a copy es_client.reindex refresh: true, wait_for_completion: true, body: renamed # make sure the copy exists before we delete the original tries = 0 rename_ok = false while( tries < 10 ) do es_client.indices.refresh index: new_name rescue false indices = es_client.indices.get_aliases.keys break if rename_ok = indices.include?(new_name) tries += 1 sleep(1) end raise "Failed to rename #{@live_index_name} -> #{new_name}" unless rename_ok # delete the original es_client.indices.delete index: @live_index_name end
stage_aliased_to()
click to toggle source
# File lib/elasticsearch/index_stager.rb, line 117 def stage_aliased_to # find the newest tmp index to which staged is aliased. # we need this because we want to re-alias it. aliased_to = find_newest_alias_for(stage_index_name) end
tmp_index_pattern()
click to toggle source
# File lib/elasticsearch/index_stager.rb, line 113 def tmp_index_pattern /#{index_name}_(\d{14})-\w{8}$/ end