module ElasticUtil

This module provides a way to backup and restore Elasticsearch data.

@example Backup data from one elasticsearch cluster and restore it to another.

ElasticUtil.backup('http://localhost:9300', '/tmp/local-elastic-data', {size:5000})
ElasticUtil.restore('http://localhost:9301', '/tmp/local-elastic-data')

Constants

DUMP_DIR

The name of the data directory, relative to the user provided backup directory.

VERSION

Public Class Methods

api_get(uri, opts={}) click to toggle source
# File lib/elastic_util.rb, line 320
def self.api_get(uri, opts={})
  exec_request(uri, "GET", opts={})
end
api_post(uri, payload, opts={}) click to toggle source
# File lib/elastic_util.rb, line 324
def self.api_post(uri, payload, opts={})
  exec_request(uri, "POST", opts.merge({payload:payload}))
end
backup(url, backup_dir, opts={}) click to toggle source

Backup Elasticsearch data to a local directory.

This uses ElasticSearch's scroll api to fetch all records for indices and write the data to a local directory. The files it generates are given a .json.data extension. They are not valid JSON files, but rather are in the format expected by ElasticSearch's _bulk api.

So restore simply has to POST the contents of each file as Content-Type: application/x-ndjson

Use the :size option to change the number of results to fetch at once and also the size of the data files generated. Increasing size means larger files and fewer api requests for both backup and the subsequent restore of that data.

@example Backup default Elasticsearch running locally.

ElasticUtil.backup('http://localhost:9300', '/tmp/local-elastic-data')

@param [String] url The url of the Elasticsearch cluster eg. 'localhost:9300' @param [String] backup_dir The local directory to store data in. eg. '/tmp/es2.4' @param [Hash] opts The options for this backup. @option opts [Array] :indices The indices to backup. Default is all. @option opts [Array] :exclude_indices Exclude certain indexes. @option opts [Array] :exclude_fields Exclude certain fields. Default is ['_id']. @option opts [Array] :replace_types Replace certain types with a different type, separated by a colon. eg. 'type1:type2' or 'stat:_doc' @option opts [String] :scroll The scroll api parameter, Default is '5m'. @option opts [Integer] :size The size api parameter. Default is 1000. @option opts [true] :force Delete existing backup directory instead of erroring. Default is false. @option opts [true] :quiet Don't print anything. Default is false.

@return [true] or raises an error

# File lib/elastic_util.rb, line 54
def self.backup(url, backup_dir, opts={})
  start_time = Time.now
  url = url.strip.chomp("/")
  backup_dir = backup_dir.strip
  path = File.join(backup_dir.strip, DUMP_DIR)
  indices = []

  if opts[:dry]
    puts "(DRY RUN) Started backup" unless opts[:quiet]
  else
    puts "Started backup" unless opts[:quiet]
  end

  # ping it first
  response = nil
  uri = URI(url)
  response = api_get(uri)
  if !response.is_a?(Net::HTTPSuccess)
    raise Error, "Unable to reach Elasticsearch at url '#{url}'!\n#{response.inspect}\n#{response.body.to_s}"
  end

  # determine indices to backup, default is everything.
  if opts[:indices]
    indices = opts[:indices]
  else
    response = nil
    uri = URI(url + "/_cat/indices?format=json")
    response = api_get(uri)
    if !response.is_a?(Net::HTTPSuccess)
      raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}"
    end
    json_response = JSON.parse(response.body)
    json_response.each do |record|
      indices.push(record['index'])
    end
  end
  if opts[:exclude_indices]
    indices = indices.reject {|it| opts[:exclude_indices].include?(it) }
  end

  if indices.empty?
    raise Error, "no indices to back up!"
  end
  
  opts[:scroll] ||= '5m'
  opts[:size] ||= 1000
  
  # exclude _id by default.
  if !opts.key?(:exclude_fields)
    opts[:exclude_fields] = ['_id']
  end

  # validate backup path
  if File.exists?(path)
    if opts[:force]
      FileUtils.rmtree(path)
    else
      raise Error, "backup path '#{path}' already exists! Delete it first or use --force"
    end
  end
  FileUtils.mkdir_p(path)

  if opts[:dry]
    indices.each_with_index do |index_name, i|
      puts "(#{i+1}/#{indices.size}) backing up index #{index_name}" unless opts[:quiet]
    end
    puts "(DRY RUN) Finished backup of Elasticsearch #{url} to directory #{backup_dir} (took #{(Time.now-start_time).round(3)}s)" unless opts[:quiet]
    return 0
  end

  # dump data
  indices.each_with_index do |index_name, i|
    puts "(#{i+1}/#{indices.size}) backing up index #{index_name}" unless opts[:quiet]
    # initial request
    file_index = 0
    uri = URI(url + "/#{index_name}/_search")
    params = { 
      :format => "json",
      :scroll => opts[:scroll], 
      :size => opts[:size],
      :sort => ["_doc"] 
    }
    uri.query = URI.encode_www_form(params)
    # puts "HTTP REQUEST #{uri.inspect}"
    response = api_get(uri)
    if !response.is_a?(Net::HTTPSuccess)
      raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}"
    end
    json_response = JSON.parse(response.body)
    raise Error, "No scroll_id returned in response:\n#{response.inspect}" unless json_response['_scroll_id']
    scroll_id = json_response['_scroll_id']
    hits = json_response['hits']['hits']
    save_bulk_data(path, hits, nil, opts)
    
    file_index = 1
    # scroll requests
    while !hits.empty?
      uri = URI(url + "/_search/scroll")
      params = {
        :scroll_id => scroll_id, 
        :scroll => opts[:scroll]
      }
      uri.query = URI.encode_www_form(params)
      # puts "HTTP REQUEST #{uri.inspect}"
      response = api_get(uri)
      if !response.is_a?(Net::HTTPSuccess)
        raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}"
      end
      json_response = JSON.parse(response.body)
      raise Error, "No scroll_id returned in response:\n#{response.inspect}\n#{response.body.to_s}" unless json_response['_scroll_id']
      scroll_id = json_response['_scroll_id']
      hits = json_response['hits']['hits']
      save_bulk_data(path, hits, file_index, opts)
      file_index += 1
    end
  end
  
  puts "Finished backup of Elasticsearch #{url} to directory #{backup_dir} (took #{(Time.now-start_time).round(3)}s)" unless opts[:quiet]
  return true
end
exec_request(uri, http_method="GET", opts={}) click to toggle source
# File lib/elastic_util.rb, line 274
def self.exec_request(uri, http_method="GET", opts={})
  # parse request URI and options
  uri = uri.is_a?(URI) ? uri : URI(uri)
  http_method = http_method.to_s.upcase
  headers = opts[:headers] || {}
  payload = opts[:payload] || opts[:body]
  http = Net::HTTP.new(uri.host, uri.port)
  if uri.scheme == 'https'
    http.use_ssl = true
    # todo: always ignore ssl errors for now, but this should be an option
    # http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    http.verify_mode = OpenSSL::SSL::VERIFY_NONE
  end
  http.read_timeout = opts[:read_timeout] || (60*15)
  http.open_timeout = opts[:open_timeout] || 10
  request = nil
  if http_method == "GET"
    request = Net::HTTP::Get.new uri.request_uri
  elsif http_method == "POST"
    request = Net::HTTP::Post.new uri.request_uri
    request.body = payload if payload
  elsif http_method == "PUT"
    request = Net::HTTP::Put.new uri.request_uri
    request.body = payload if payload
  elsif http_method == "DELETE"
    request = Net::HTTP::Delete.new uri.request_uri
  else
    raise "HTTP method is unknown: '#{http_method}'"
  end
  # set headers
  headers.each { |k,v| request[k] = v }
  # todo: set default Accept: application/json  (probably, right?)
  # set default Content-Type
  if payload && headers['Content-Type'].nil?
    headers['Content-Type'] = "application/json"
  end
  # set basic auth
  if uri.user
    request.basic_auth uri.user, uri.password
  end
  # execute request
  response = http.request(request)
  # return the resulting Net::HTTPResponse
  return response
end
restore(url, backup_dir, opts={}) click to toggle source

Restore Elasticsearch data from a backup. This will do a POST to the _bulk api for each file in the backup directory.

@example Restore local cluster with our backup.

ElasticUtil.restore('http://localhost:9301', '/tmp/local-elastic-data')

@param [String] url The url of the Elasticsearch cluster eg. 'localhost:9200'. @param [String] backup_dir The backup directory. @param [Hash] opts The options for this backup. @option opts [true] :quiet Don't print anything. Default is false.

@return [true] or raises an error

# File lib/elastic_util.rb, line 189
def self.restore(url, backup_dir, opts={})
  start_time = Time.now
  url = url.strip.chomp("/")
  backup_dir = backup_dir.strip
  path = File.join(backup_dir.strip, DUMP_DIR)

  if opts[:dry]
    puts "(DRY RUN) Started restore" unless opts[:quiet]
  else
    puts "Started restore" unless opts[:quiet]
  end

  # validate backup path
  if !Dir.exists?(path)
    raise Error, "backup path '#{backup_dir}' does not exist!"
  end

  # ping it first
  uri = URI(url)
  response = api_get(uri)
  if !response.is_a?(Net::HTTPSuccess)
    raise Error, "Unable to reach Elasticsearch at url '#{url}'!\n#{response.inspect}\n#{response.body.to_s}"
  end

  # find files to import
  found_files = Dir[File.join(path, '**', '*.json.data' )]
  if found_files.empty?
    raise Error, "backup path '#{backup_dir}' does not exist!"
  else
    puts "Found #{found_files.size} files to import" unless opts[:quiet]
  end

  if opts[:dry]
    found_files.each_with_index do |file, i|
      puts "(#{i+1}/#{found_files.size}) bulk importing file #{file}" unless opts[:quiet]
    end
    puts "(DRY RUN) Finished restore of Elasticsearch #{url} with backup #{backup_dir} (took #{(Time.now-start_time).round(3)}s)" unless opts[:quiet]
    return 0
  end

  # bulk api request for each file
  found_files.each_with_index do |file, i|
    puts "(#{i+1}/#{found_files.size}) bulk importing file #{file}" unless opts[:quiet]
    payload = File.read(file)
    uri = URI(url + "/_bulk")
    response = api_post(uri, payload, {:headers => {"Content-Type" => "application/x-ndjson"} })
    if !response.is_a?(Net::HTTPSuccess)
      raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}"
    end
  end

  puts "Finished restore of Elasticsearch #{url} with backup #{backup_dir} (took #{(Time.now-start_time).round(3)}s)" unless opts[:quiet]
  return true
end