class Leda::Stores::Elasticsearch::Runner
Constants
- BULK_LINES_PER_RECORD
so long as they are all index ops.
- RESTORE_BATCH_DISPATCH_TRIGGER
Attributes
directory[R]
es_client[R]
indices[R]
Public Class Methods
new(directory, indices, es_client)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 36 def initialize(directory, indices, es_client) @directory = directory @indices = indices @es_client = es_client end
Public Instance Methods
dump()
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 42 def dump $stderr.puts "Exporting to #{echo_fn(directory)} ..." indices.each do |index| dump_index_metadata(index) scan_all_records_into_bulk_format(index) end $stderr.puts "... export complete." end
restore()
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 51 def restore $stderr.puts "Importing from #{echo_fn(directory)} ..." indices.each do |index| replace_index(index) bulk_load_records(index) end $stderr.puts "... import complete." end
Private Instance Methods
bulk_load_batch(batch)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 183 def bulk_load_batch(batch) es_client.bulk body: batch end
bulk_load_records(index)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 154 def bulk_load_records(index) fn = bulk_records_filename(index) $stderr.puts " - Reading records for #{index} from #{echo_fn(fn)} " total_ct = 0 fn.each_line { |l| total_ct += 1 } total_ct /= BULK_LINES_PER_RECORD batch = "" batch_line_ct = 0 loaded_ct = 0 fn.each_line do |line| batch_line_ct += 1 batch << line if batch_line_ct % BULK_LINES_PER_RECORD == 0 && batch.size > RESTORE_BATCH_DISPATCH_TRIGGER bulk_load_batch(batch) loaded_ct += batch_line_ct / BULK_LINES_PER_RECORD $stderr.print "\r #{loaded_ct} / #{total_ct} => %5.1f%% done" % (loaded_ct * 100.0 / total_ct) batch = "" batch_line_ct = 0 end end unless batch.empty? bulk_load_batch(batch) loaded_ct += batch_line_ct / BULK_LINES_PER_RECORD end $stderr.puts "\r #{loaded_ct} / #{total_ct} => all done." end
bulk_records_filename(index)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 75 def bulk_records_filename(index) directory.join("#{index}_bulk-records.json") end
convert_to_bulk_index_rows(hit)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 119 def convert_to_bulk_index_rows(hit) [ Oj.dump({ "index" => hit.slice("_index", "_type", "_id") }), Oj.dump(hit['_source']) ] end
dump_index_metadata(index)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 79 def dump_index_metadata(index) dump_mapping(index) dump_settings(index) end
dump_mapping(index)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 84 def dump_mapping(index) fn = mapping_filename(index) $stderr.puts " - Dumping mapping for #{index} to #{echo_fn(fn)}" mapping = es_client.indices.get_mapping index: index fn.open('w') { |f| f.puts JSON.pretty_generate(mapping) } end
dump_settings(index)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 91 def dump_settings(index) fn = settings_filename(index) $stderr.puts " - Dumping settings for #{index} to #{echo_fn(fn)}" settings = es_client.indices.get_settings index: index fn.open('w') { |f| f.puts JSON.pretty_generate(settings) } end
echo_fn(pathname)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 62 def echo_fn(pathname) # TODO: an alternative pathname.relative_path_from(Rails.root) end
mapping_filename(index)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 67 def mapping_filename(index) directory.join("#{index}_mapping.json") end
replace_index(index)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 126 def replace_index(index) map_fn = mapping_filename(index) $stderr.puts " - Reading mapping from #{echo_fn(map_fn)}" mappings = Oj.load(map_fn.read).values.first # assume only one index set_fn = settings_filename(index) $stderr.puts " - Reading settings from #{echo_fn(set_fn)}" settings = Oj.load(set_fn.read).values.first # assume only one index body = {}.merge!(mappings).merge!(settings) begin $stderr.print " - Deleting index #{index} ... " es_client.indices.delete index: index $stderr.puts "done" rescue ::Elasticsearch::Transport::Transport::Errors::NotFound $stderr.puts "not necessary" end $stderr.puts " - Creating index #{index} using settings and mapping" es_client.indices.create index: index, body: body end
scan_all_records_into_bulk_format(index)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 98 def scan_all_records_into_bulk_format(index) fn = bulk_records_filename(index) $stderr.puts " - Dumping records for #{index} to #{echo_fn(fn)} " # start the scroll with a search results = es_client.search index: index, search_type: 'scan', scroll: '5m', size: 500 total_ct = results['hits']['total'] written_ct = 0 fn.open('w:utf-8') do |f| while results = es_client.scroll(scroll_id: results['_scroll_id'], scroll: '5m') and not results['hits']['hits'].empty? results['hits']['hits'].each do |hit| f.puts convert_to_bulk_index_rows(hit) end written_ct += results['hits']['hits'].size $stderr.print "\r #{written_ct} / #{total_ct} => %5.1f%% done" % (written_ct * 100.0 / total_ct) end end $stderr.puts "\r #{written_ct} / #{total_ct} => all done." end
settings_filename(index)
click to toggle source
# File lib/leda/stores/elasticsearch.rb, line 71 def settings_filename(index) directory.join("#{index}_settings.json") end