class Request::Elastic

Elasticsearch requests wrapper

Constants

RETRY_ERRORS

Public Class Methods

new(config) click to toggle source
# File lib/elastic_manager/request.rb, line 22
def initialize(config)
  @client = HTTP.timeout(
    write:   config['timeout']['write'].to_i,
    connect: config['timeout']['connect'].to_i,
    read:    config['timeout']['read'].to_i
  ).headers(
    'Accept':       'application/json',
    'Content-type': 'application/json'
  )
  @url   = config['es']['url']
  @retry = config['retry'].to_i
  @sleep = config['sleep'].to_i
end

Public Instance Methods

all_indices(from=nil, to=nil, daysago=nil, state=nil, type=nil, config) click to toggle source
# File lib/elastic_manager/request.rb, line 124
def all_indices(from=nil, to=nil, daysago=nil, state=nil, type=nil, config)
  indices = get_all_indices

  # TODO: (anton.ryabov) next line just for debug purpose, need better handling
  indices.each { |k, v| log.debug "#{k} - #{v.to_json}" unless v['settings'] }

  indices.select! { |_, v| v['state'] == state } if state
  indices.select! { |_, v| v['settings']['index']['routing']['allocation']['require']['box_type'] == type } if type

  result = []
  indices.each do |index, _|
    begin
      index_date = Date.parse(index.split('-').last)
    rescue ArgumentError => e
      log.error "#{e.message} for #{index}"
      next
    end

    daysago_local = override_daysago(make_index_name(index), config, daysago)

    if from.nil? && index_date < (Date.today - daysago_local)
      result << CGI.escape(index)
    elsif (from..to).cover? index_date
      result << CGI.escape(index)
    end
  end

  result
end
all_indices_in_snapshots(from=nil, to=nil, daysago=nil, config) click to toggle source
# File lib/elastic_manager/request.rb, line 86
def all_indices_in_snapshots(from=nil, to=nil, daysago=nil, config)
  all_snapshots = get_all_snapshots
  all_snapshots.select! { |snap| snap['status'] == 'SUCCESS' }

  result = []
  all_snapshots.each do |snap|
    begin
      snap_date = Date.parse(snap['id'].split('-').last)
    rescue ArgumentError => e
      log.error "#{e.message} for #{index}"
      next
    end

    index = snap['id'].gsub('snapshot_', '')
    daysago_local = override_daysago(make_index_name(index), config, daysago)

    if from.nil? && snap_date < (Date.today - daysago_local)
      result << CGI.escape(index)
    elsif (from..to).cover? snap_date
      result << CGI.escape(index)
    end
  end

  result
end
chill_index(index, box_type) click to toggle source
# File lib/elastic_manager/request.rb, line 304
def chill_index(index, box_type)
  body = {
    'index.routing.allocation.require.box_type' => box_type
  }
  response = request(:put, "/#{index}/_settings?master_timeout=1m", body)

  if response.code == 200
    response = json_parse(response)
  else
    log.fatal "can't chill #{index}, response was: #{response.code} - #{response}"
    return false
  end

  response['acknowledged'].is_a?(TrueClass)
end
close_index(index, tag) click to toggle source
# File lib/elastic_manager/request.rb, line 268
def close_index(index, tag)
  box_type = index_box_type(index)

  return false if box_type.nil?

  if box_type == tag
    log.fatal "i will not close index #{index} in box_type #{tag}"
    false
  else
    response = request(:post, "/#{index}/_close?master_timeout=1m")

    if response.code == 200
      response = json_parse(response)
    else
      log.fatal "wrong response code for #{index} close"
      return false
    end

    response['acknowledged'].is_a?(TrueClass)
  end
end
delete_index(index, delete_without_snapshot=false) click to toggle source
# File lib/elastic_manager/request.rb, line 320
def delete_index(index, delete_without_snapshot=false)
  unless delete_without_snapshot
    snapshot_name = "snapshot_#{index}"
    snapshot_repo = find_snapshot_repo

    return false unless find_snapshot(snapshot_repo, snapshot_name)
  end

  response = request(:delete, "/#{index}")

  if response.code == 200
    response = json_parse(response)
  else
    log.fatal "can't delete index #{index}, response was: #{response.code} - #{response}"
    return false
  end

  response['acknowledged'].is_a?(TrueClass)
end
delete_snapshot(snapshot, repo) click to toggle source
# File lib/elastic_manager/request.rb, line 393
def delete_snapshot(snapshot, repo)
  response = request(:delete, "/_snapshot/#{repo}/#{snapshot}")

  if response.code == 200
    response = json_parse(response)
  else
    log.fatal "can't delete snapshot #{snapshot}, response was: #{response.code} - #{response}"
    return false
  end

  response['acknowledged'].is_a?(TrueClass)
end
find_snapshot(repo, snapshot_name) click to toggle source
# File lib/elastic_manager/request.rb, line 194
def find_snapshot(repo, snapshot_name)
  response = request(:get, "/_snapshot/#{repo}/#{snapshot_name}/")

  if response.code == 200
    snapshot = json_parse(response)['snapshots'][0]

    if snapshot['state'] == 'SUCCESS'
      snapshot['snapshot']
    else
      log.error 'wrong snapshot state'
      return false
    end
  else
    log.error "can't find snapshot #{snapshot_name} in #{repo} response was: #{response.code} - #{response}"
    false
  end
end
find_snapshot_repo() click to toggle source
# File lib/elastic_manager/request.rb, line 182
def find_snapshot_repo
  # TODO: we need improve this if several snapshot repos used in elastic
  response = request(:get, '/_snapshot/')

  if response.code == 200
    json_parse(response).keys.first
  else
    log.fatal "dunno what to do with: #{response.code} - #{response}"
    exit 1
  end
end
get_all_indices() click to toggle source
# File lib/elastic_manager/request.rb, line 154
def get_all_indices
  req_path   =  '/_cluster/state/metadata/'
  req_params =  '?filter_path=metadata.indices.*.state,'
  req_params += 'metadata.indices.*.settings.index.routing.allocation.require.box_type'

  response = request(:get, req_path + req_params)

  if response.code == 200
    json_parse(response)['metadata']['indices']
  else
    log.fatal "can't work with all_indices response was: #{response.code} - #{response}"
    exit 1
  end
end
get_all_snapshots() click to toggle source
# File lib/elastic_manager/request.rb, line 112
def get_all_snapshots
  snapshot_repo = find_snapshot_repo
  response = request(:get, "/_cat/snapshots/#{snapshot_repo}")

  if response.code == 200
    json_parse(response)
  else
    log.fatal "can't work with all_snapshots response was: #{response.code} - #{response}"
    exit 1
  end
end
green?() click to toggle source
# File lib/elastic_manager/request.rb, line 66
def green?
  response = request(:get, '/_cluster/health')
  return json_parse(response)['status'] == 'green' if response.code == 200

  false
end
index_box_type(index) click to toggle source
# File lib/elastic_manager/request.rb, line 290
def index_box_type(index)
  response = request(:get, "/#{index}/_settings/index.routing.allocation.require.box_type")

  if response.code == 200
    response = json_parse(response)
    box_type = response[CGI.unescape(index)]['settings']['index']['routing']['allocation']['require']['box_type']
    log.debug "for #{index} box_type is #{box_type}"
    box_type
  else
    log.fatal "can't check box_type for #{index}, response was: #{response.code} - #{response}"
    nil
  end
end
open_index(index) click to toggle source
# File lib/elastic_manager/request.rb, line 255
def open_index(index)
  response = request(:post, "/#{index}/_open?master_timeout=1m")

  if response.code == 200
    response = json_parse(response)
  else
    log.fatal "wrong response code for #{index} open"
    return false
  end

  response['acknowledged'].is_a?(TrueClass)
end
override_daysago(index_name, config, daysago) click to toggle source
# File lib/elastic_manager/request.rb, line 73
def override_daysago(index_name, config, daysago)
  if config['settings']['indices'] &&
     config['settings']['indices'][index_name] &&
     config['settings']['indices'][index_name]['daysago'] &&
     config['settings']['indices'][index_name]['daysago'][config['task']] &&
     !config['settings']['indices'][index_name]['daysago'][config['task']].to_s.empty?
    log.debug "will override daysago for #{index_name} with #{config['settings']['indices'][index_name]['daysago'][config['task']]}"
    config['settings']['indices'][index_name]['daysago'][config['task']].to_i
  else
    daysago.to_i
  end
end
request(method, url, body={}) click to toggle source
# File lib/elastic_manager/request.rb, line 49
def request(method, url, body={})
  uri = @url + url
  log.debug "uri: #{uri}"

  with_retry do
    response = @client.request(method, uri, json: body)

    if response.code == 503
      raise Request::Throttling.new(response)
    elsif response.status.server_error?
      raise Request::ServerError.new(response)
    end

    response
  end
end
restore_snapshot(index, box_type) click to toggle source
# File lib/elastic_manager/request.rb, line 212
def restore_snapshot(index, box_type)
  snapshot_name = "snapshot_#{index}"
  snapshot_repo = find_snapshot_repo
  snapshot      = find_snapshot(snapshot_repo, snapshot_name)

  return false unless snapshot

  body = {
    index_settings: {
      'index.number_of_replicas'                  => 0,
      'index.refresh_interval'                    => -1,
      'index.routing.allocation.require.box_type' => box_type
    }
  }
  response = request(:post, "/_snapshot/#{snapshot_repo}/#{snapshot}/_restore", body)

  if response.code == 200
    sleep 5
    wait_snapshot_restore(index)
  else
    log.error "can't restore snapshot #{snapshot_name} response was: #{response.code} - #{response}"
    false
  end
end
snapshot_exist?(snapshot_name, repo) click to toggle source
# File lib/elastic_manager/request.rb, line 169
def snapshot_exist?(snapshot_name, repo)
  response = request(:get, "/_snapshot/#{repo}/#{snapshot_name}/")

  if response.code == 200
    true
  elsif response.code == 404
    false
  else
    log.fatal "can't check snapshot existing, response was: #{response.code} - #{response}"
    exit 1
  end
end
snapshot_index(index) click to toggle source
# File lib/elastic_manager/request.rb, line 371
def snapshot_index(index)
  snapshot_name = "snapshot_#{index}"
  snapshot_repo = find_snapshot_repo

  body = {
    'indices'              => CGI.unescape(index),
    'ignore_unavailable'   => false,
    'include_global_state' => false,
    'partial'              => false
  }

  response = request(:put, "/_snapshot/#{snapshot_repo}/#{snapshot_name}/", body)

  if response.code == 200
    wait_snapshot(snapshot_name, snapshot_repo)
  else
    # TODO: (anton.ryabov) add slack notify due failed snapshot
    log.error "can't snapshot #{index}, response was: #{response.code} - #{response}"
    false
  end
end
wait_snapshot(snapshot, repo) click to toggle source
# File lib/elastic_manager/request.rb, line 340
def wait_snapshot(snapshot, repo)
  snapshot_ok = false

  until snapshot_ok
    sleep @sleep / 2
    response = request(:get, "/_snapshot/#{repo}/#{snapshot}/_status")

    if response.code == 200
      # TODO: (anton.ryabov) add logging of percent and time ?
      # stats = status['snapshots'][0]['stats']
      # msg = "(#{stats['total_size_in_bytes']/1024/1024/1024}Gb / #{stats['processed_size_in_bytes']/1024/1024/1024}Gb)"
      # puts "Get backup status #{msg}: retry attempt #{attempt_number}; #{total_delay.round} seconds have passed."
      state = json_parse(response)['snapshots'][0]['state']
      log.debug "snapshot check response: #{response.code} - #{response}"

      if state == 'SUCCESS'
        snapshot_ok = true
      elsif %w[FAILED PARTIAL INCOMPATIBLE].include?(state)
        # TODO: (anton.ryabov) add slack notify due failed snapshot
        log.fatal "can't snapshot #{snapshot} in #{repo}: #{response.code} - #{response}"
        exit 1
      end
    else
      log.error "can't check snapshot: #{response.code} - #{response}"
      # TODO: (anton.ryabov) we need tries mechanizm here
    end
  end

  true
end
wait_snapshot_restore(index) click to toggle source
# File lib/elastic_manager/request.rb, line 237
def wait_snapshot_restore(index)
  restore_ok = false

  until restore_ok
    sleep @sleep / 2
    response = request(:get, "/#{index}/_recovery")

    if response.code == 200
      # TODO: (anton.ryabov) add logging of percent and time ?
      restore_ok = json_parse(response)[index]['shards'].map { |s| s['stage'] == 'DONE' }.all? { |a| a }
    else
      log.error "can't check recovery: #{response.code} - #{response}"
    end
  end

  true
end
with_retry() { || ... } click to toggle source
# File lib/elastic_manager/request.rb, line 36
def with_retry
  tries ||= @retry

  yield
rescue *RETRY_ERRORS => e
  log.warn "tries left #{tries + 1} '''#{e.message}''' sleeping #{@sleep} sec..."
  sleep @sleep

  retry unless (tries -= 1).zero?
  log.fatal "backtrace:\n\t#{e.backtrace.join("\n\t")}"
  exit 1
end