class Leda::Stores::Elasticsearch::Runner

Constants

BULK_LINES_PER_RECORD

so long as they are all index ops.

RESTORE_BATCH_DISPATCH_TRIGGER

Attributes

directory[R]
es_client[R]
indices[R]

Public Class Methods

new(directory, indices, es_client) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 36
def initialize(directory, indices, es_client)
  @directory = directory
  @indices = indices
  @es_client = es_client
end

Public Instance Methods

dump() click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 42
def dump
  $stderr.puts "Exporting to #{echo_fn(directory)} ..."
  indices.each do |index|
    dump_index_metadata(index)
    scan_all_records_into_bulk_format(index)
  end
  $stderr.puts "... export complete."
end
restore() click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 51
def restore
  $stderr.puts "Importing from #{echo_fn(directory)} ..."
  indices.each do |index|
    replace_index(index)
    bulk_load_records(index)
  end
  $stderr.puts "... import complete."
end

Private Instance Methods

bulk_load_batch(batch) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 183
def bulk_load_batch(batch)
  es_client.bulk body: batch
end
bulk_load_records(index) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 154
def bulk_load_records(index)
  fn = bulk_records_filename(index)
  $stderr.puts "  - Reading records for #{index} from #{echo_fn(fn)} "

  total_ct = 0
  fn.each_line { |l| total_ct += 1 }
  total_ct /= BULK_LINES_PER_RECORD

  batch = ""
  batch_line_ct = 0
  loaded_ct = 0
  fn.each_line do |line|
    batch_line_ct += 1
    batch << line
    if batch_line_ct % BULK_LINES_PER_RECORD == 0 && batch.size > RESTORE_BATCH_DISPATCH_TRIGGER
      bulk_load_batch(batch)
      loaded_ct += batch_line_ct / BULK_LINES_PER_RECORD
      $stderr.print "\r    #{loaded_ct} / #{total_ct} => %5.1f%% done" % (loaded_ct * 100.0 / total_ct)
      batch = ""
      batch_line_ct = 0
    end
  end
  unless batch.empty?
    bulk_load_batch(batch)
    loaded_ct += batch_line_ct / BULK_LINES_PER_RECORD
  end
  $stderr.puts "\r     #{loaded_ct} / #{total_ct} =>  all done."
end
bulk_records_filename(index) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 75
def bulk_records_filename(index)
  directory.join("#{index}_bulk-records.json")
end
convert_to_bulk_index_rows(hit) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 119
def convert_to_bulk_index_rows(hit)
  [
    Oj.dump({ "index" => hit.slice("_index", "_type", "_id") }),
    Oj.dump(hit['_source'])
  ]
end
dump_index_metadata(index) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 79
def dump_index_metadata(index)
  dump_mapping(index)
  dump_settings(index)
end
dump_mapping(index) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 84
def dump_mapping(index)
  fn = mapping_filename(index)
  $stderr.puts "  - Dumping mapping for #{index} to #{echo_fn(fn)}"
  mapping = es_client.indices.get_mapping index: index
  fn.open('w') { |f| f.puts JSON.pretty_generate(mapping) }
end
dump_settings(index) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 91
def dump_settings(index)
  fn = settings_filename(index)
  $stderr.puts "  - Dumping settings for #{index} to #{echo_fn(fn)}"
  settings = es_client.indices.get_settings index: index
  fn.open('w') { |f| f.puts JSON.pretty_generate(settings) }
end
echo_fn(pathname) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 62
def echo_fn(pathname)
  # TODO: an alternative
  pathname.relative_path_from(Rails.root)
end
mapping_filename(index) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 67
def mapping_filename(index)
  directory.join("#{index}_mapping.json")
end
replace_index(index) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 126
def replace_index(index)
  map_fn = mapping_filename(index)
  $stderr.puts "  - Reading mapping from #{echo_fn(map_fn)}"
  mappings = Oj.load(map_fn.read).values.first # assume only one index

  set_fn = settings_filename(index)
  $stderr.puts "  - Reading settings from #{echo_fn(set_fn)}"
  settings = Oj.load(set_fn.read).values.first # assume only one index

  body = {}.merge!(mappings).merge!(settings)

  begin
    $stderr.print "  - Deleting index #{index} ... "
    es_client.indices.delete index: index
    $stderr.puts "done"
  rescue ::Elasticsearch::Transport::Transport::Errors::NotFound
    $stderr.puts "not necessary"
  end

  $stderr.puts "  - Creating index #{index} using settings and mapping"
  es_client.indices.create index: index, body: body
end
scan_all_records_into_bulk_format(index) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 98
def scan_all_records_into_bulk_format(index)
  fn = bulk_records_filename(index)
  $stderr.puts "  - Dumping records for #{index} to #{echo_fn(fn)} "

  # start the scroll with a search
  results = es_client.search index: index, search_type: 'scan', scroll: '5m', size: 500
  total_ct = results['hits']['total']

  written_ct = 0
  fn.open('w:utf-8') do |f|
    while results = es_client.scroll(scroll_id: results['_scroll_id'], scroll: '5m') and not results['hits']['hits'].empty?
      results['hits']['hits'].each do |hit|
        f.puts convert_to_bulk_index_rows(hit)
      end
      written_ct += results['hits']['hits'].size
      $stderr.print "\r    #{written_ct} / #{total_ct} => %5.1f%% done" % (written_ct * 100.0 / total_ct)
    end
  end
  $stderr.puts "\r     #{written_ct} / #{total_ct} =>  all done."
end
settings_filename(index) click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 71
def settings_filename(index)
  directory.join("#{index}_settings.json")
end