module ElasticUtil
This module provides a way to backup and restore Elasticsearch data.
@example Backup data from one elasticsearch cluster and restore it to another.
ElasticUtil.backup('http://localhost:9300', '/tmp/local-elastic-data', {size:5000}) ElasticUtil.restore('http://localhost:9301', '/tmp/local-elastic-data')
Constants
- DUMP_DIR
The name of the data directory, relative to the user provided backup directory.
- VERSION
Public Class Methods
# File lib/elastic_util.rb, line 320 def self.api_get(uri, opts={}) exec_request(uri, "GET", opts={}) end
# File lib/elastic_util.rb, line 324 def self.api_post(uri, payload, opts={}) exec_request(uri, "POST", opts.merge({payload:payload})) end
Backup Elasticsearch data to a local directory.
This uses ElasticSearch's scroll api to fetch all records for indices and write the data to a local directory. The files it generates are given a .json.data extension. They are not valid JSON files, but rather are in the format expected by ElasticSearch's _bulk api.
So restore simply has to POST the contents of each file as Content-Type: application/x-ndjson
Use the :size option to change the number of results to fetch at once and also the size of the data files generated. Increasing size means larger files and fewer api requests for both backup and the subsequent restore of that data.
@example Backup default Elasticsearch running locally.
ElasticUtil.backup('http://localhost:9300', '/tmp/local-elastic-data')
@param [String] url The url of the Elasticsearch cluster eg. 'localhost:9300' @param [String] backup_dir The local directory to store data in. eg. '/tmp/es2.4' @param [Hash] opts The options for this backup. @option opts [Array] :indices The indices to backup. Default is all. @option opts [Array] :exclude_indices Exclude certain indexes. @option opts [Array] :exclude_fields Exclude certain fields. Default is ['_id']. @option opts [Array] :replace_types Replace certain types with a different type, separated by a colon. eg. 'type1:type2' or 'stat:_doc' @option opts [String] :scroll The scroll api parameter, Default is '5m'. @option opts [Integer] :size The size api parameter. Default is 1000. @option opts [true] :force Delete existing backup directory instead of erroring. Default is false. @option opts [true] :quiet Don't print anything. Default is false.
@return [true] or raises an error
# File lib/elastic_util.rb, line 54 def self.backup(url, backup_dir, opts={}) start_time = Time.now url = url.strip.chomp("/") backup_dir = backup_dir.strip path = File.join(backup_dir.strip, DUMP_DIR) indices = [] if opts[:dry] puts "(DRY RUN) Started backup" unless opts[:quiet] else puts "Started backup" unless opts[:quiet] end # ping it first response = nil uri = URI(url) response = api_get(uri) if !response.is_a?(Net::HTTPSuccess) raise Error, "Unable to reach Elasticsearch at url '#{url}'!\n#{response.inspect}\n#{response.body.to_s}" end # determine indices to backup, default is everything. if opts[:indices] indices = opts[:indices] else response = nil uri = URI(url + "/_cat/indices?format=json") response = api_get(uri) if !response.is_a?(Net::HTTPSuccess) raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}" end json_response = JSON.parse(response.body) json_response.each do |record| indices.push(record['index']) end end if opts[:exclude_indices] indices = indices.reject {|it| opts[:exclude_indices].include?(it) } end if indices.empty? raise Error, "no indices to back up!" end opts[:scroll] ||= '5m' opts[:size] ||= 1000 # exclude _id by default. if !opts.key?(:exclude_fields) opts[:exclude_fields] = ['_id'] end # validate backup path if File.exists?(path) if opts[:force] FileUtils.rmtree(path) else raise Error, "backup path '#{path}' already exists! Delete it first or use --force" end end FileUtils.mkdir_p(path) if opts[:dry] indices.each_with_index do |index_name, i| puts "(#{i+1}/#{indices.size}) backing up index #{index_name}" unless opts[:quiet] end puts "(DRY RUN) Finished backup of Elasticsearch #{url} to directory #{backup_dir} (took #{(Time.now-start_time).round(3)}s)" unless opts[:quiet] return 0 end # dump data indices.each_with_index do |index_name, i| puts "(#{i+1}/#{indices.size}) backing up index #{index_name}" unless opts[:quiet] # initial request file_index = 0 uri = URI(url + "/#{index_name}/_search") params = { :format => "json", :scroll => opts[:scroll], :size => opts[:size], :sort => ["_doc"] } uri.query = URI.encode_www_form(params) # puts "HTTP REQUEST #{uri.inspect}" response = api_get(uri) if !response.is_a?(Net::HTTPSuccess) raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}" end json_response = JSON.parse(response.body) raise Error, "No scroll_id returned in response:\n#{response.inspect}" unless json_response['_scroll_id'] scroll_id = json_response['_scroll_id'] hits = json_response['hits']['hits'] save_bulk_data(path, hits, nil, opts) file_index = 1 # scroll requests while !hits.empty? uri = URI(url + "/_search/scroll") params = { :scroll_id => scroll_id, :scroll => opts[:scroll] } uri.query = URI.encode_www_form(params) # puts "HTTP REQUEST #{uri.inspect}" response = api_get(uri) if !response.is_a?(Net::HTTPSuccess) raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}" end json_response = JSON.parse(response.body) raise Error, "No scroll_id returned in response:\n#{response.inspect}\n#{response.body.to_s}" unless json_response['_scroll_id'] scroll_id = json_response['_scroll_id'] hits = json_response['hits']['hits'] save_bulk_data(path, hits, file_index, opts) file_index += 1 end end puts "Finished backup of Elasticsearch #{url} to directory #{backup_dir} (took #{(Time.now-start_time).round(3)}s)" unless opts[:quiet] return true end
# File lib/elastic_util.rb, line 274 def self.exec_request(uri, http_method="GET", opts={}) # parse request URI and options uri = uri.is_a?(URI) ? uri : URI(uri) http_method = http_method.to_s.upcase headers = opts[:headers] || {} payload = opts[:payload] || opts[:body] http = Net::HTTP.new(uri.host, uri.port) if uri.scheme == 'https' http.use_ssl = true # todo: always ignore ssl errors for now, but this should be an option # http.verify_mode = OpenSSL::SSL::VERIFY_PEER http.verify_mode = OpenSSL::SSL::VERIFY_NONE end http.read_timeout = opts[:read_timeout] || (60*15) http.open_timeout = opts[:open_timeout] || 10 request = nil if http_method == "GET" request = Net::HTTP::Get.new uri.request_uri elsif http_method == "POST" request = Net::HTTP::Post.new uri.request_uri request.body = payload if payload elsif http_method == "PUT" request = Net::HTTP::Put.new uri.request_uri request.body = payload if payload elsif http_method == "DELETE" request = Net::HTTP::Delete.new uri.request_uri else raise "HTTP method is unknown: '#{http_method}'" end # set headers headers.each { |k,v| request[k] = v } # todo: set default Accept: application/json (probably, right?) # set default Content-Type if payload && headers['Content-Type'].nil? headers['Content-Type'] = "application/json" end # set basic auth if uri.user request.basic_auth uri.user, uri.password end # execute request response = http.request(request) # return the resulting Net::HTTPResponse return response end
Restore Elasticsearch data from a backup. This will do a POST to the _bulk api for each file in the backup directory.
@example Restore local cluster with our backup.
ElasticUtil.restore('http://localhost:9301', '/tmp/local-elastic-data')
@param [String] url The url of the Elasticsearch cluster eg. 'localhost:9200'. @param [String] backup_dir The backup directory. @param [Hash] opts The options for this backup. @option opts [true] :quiet Don't print anything. Default is false.
@return [true] or raises an error
# File lib/elastic_util.rb, line 189 def self.restore(url, backup_dir, opts={}) start_time = Time.now url = url.strip.chomp("/") backup_dir = backup_dir.strip path = File.join(backup_dir.strip, DUMP_DIR) if opts[:dry] puts "(DRY RUN) Started restore" unless opts[:quiet] else puts "Started restore" unless opts[:quiet] end # validate backup path if !Dir.exists?(path) raise Error, "backup path '#{backup_dir}' does not exist!" end # ping it first uri = URI(url) response = api_get(uri) if !response.is_a?(Net::HTTPSuccess) raise Error, "Unable to reach Elasticsearch at url '#{url}'!\n#{response.inspect}\n#{response.body.to_s}" end # find files to import found_files = Dir[File.join(path, '**', '*.json.data' )] if found_files.empty? raise Error, "backup path '#{backup_dir}' does not exist!" else puts "Found #{found_files.size} files to import" unless opts[:quiet] end if opts[:dry] found_files.each_with_index do |file, i| puts "(#{i+1}/#{found_files.size}) bulk importing file #{file}" unless opts[:quiet] end puts "(DRY RUN) Finished restore of Elasticsearch #{url} with backup #{backup_dir} (took #{(Time.now-start_time).round(3)}s)" unless opts[:quiet] return 0 end # bulk api request for each file found_files.each_with_index do |file, i| puts "(#{i+1}/#{found_files.size}) bulk importing file #{file}" unless opts[:quiet] payload = File.read(file) uri = URI(url + "/_bulk") response = api_post(uri, payload, {:headers => {"Content-Type" => "application/x-ndjson"} }) if !response.is_a?(Net::HTTPSuccess) raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}" end end puts "Finished restore of Elasticsearch #{url} with backup #{backup_dir} (took #{(Time.now-start_time).round(3)}s)" unless opts[:quiet] return true end