class Elastic::Core::Connector

Constants

DEFAULT_TYPE

Public Class Methods

new(_name, _mapping, settling_time: 10.seconds) click to toggle source
# File lib/elastic/core/connector.rb, line 5
def initialize(_name, _mapping, settling_time: 10.seconds)
  @name = _name
  @mapping = _mapping
  @settling_time = settling_time
end

Public Instance Methods

bulk_index(_documents) click to toggle source
# File lib/elastic/core/connector.rb, line 65
def bulk_index(_documents)
  return if Elastic.config.disable_indexing

  # TODO: validate documents type
  body = _documents.map { |doc| { 'index' => doc.merge('_type' => DEFAULT_TYPE) } }

  write_indices.each do |write_index|
    retry_on_temporary_error('bulk indexing') do
      api.bulk(index: write_index, body: body)
    end
  end
end
copy_to(_to, batch_size: nil) click to toggle source
# File lib/elastic/core/connector.rb, line 137
def copy_to(_to, batch_size: nil) # rubocop:disable Metrics/AbcSize
  api.indices.refresh index: index_name

  r = api.search(
    index: index_name,
    body: { sort: ['_doc'] },
    scroll: '5m',
    size: batch_size || default_batch_size
  )

  count = 0
  while !r['hits']['hits'].empty?
    count += r['hits']['hits'].count
    Elastic.logger.info "Copied #{count} docs"

    body = r['hits']['hits'].map { |h| transform_hit_to_create(h) }
    api.bulk(index: _to, body: body)

    r = api.scroll scroll: '5m', scroll_id: r['_scroll_id']
  end
end
count(query: nil) click to toggle source
# File lib/elastic/core/connector.rb, line 106
def count(query: nil)
  api.count(index: index_name, body: query)['count']
end
delete(_document) click to toggle source
# File lib/elastic/core/connector.rb, line 78
def delete(_document)
  raise ArgumentError, 'document must provide an id' unless _document['_id']

  return if Elastic.config.disable_indexing

  write_index, rolling_index = write_indices

  operations = [{
    'delete' => _document.merge('_index' => write_index, '_type' => DEFAULT_TYPE)
  }]

  if rolling_index
    operations << {
      'delete' => _document.merge('_index' => rolling_index, '_type' => DEFAULT_TYPE)
    }
  end

  api.bulk(body: operations)
end
drop() click to toggle source
# File lib/elastic/core/connector.rb, line 25
def drop
  api.indices.delete index: "#{index_name}:*"
  nil
end
find(_id) click to toggle source
# File lib/elastic/core/connector.rb, line 102
def find(_id)
  api.get(index: index_name, id: _id)
end
index(_document) click to toggle source
# File lib/elastic/core/connector.rb, line 54
def index(_document)
  return if Elastic.config.disable_indexing

  # TODO: validate document type
  operations = write_indices.map do |write_index|
    { 'index' => _document.merge('_index' => write_index, '_type' => DEFAULT_TYPE) }
  end

  api.bulk(body: operations)
end
index_name() click to toggle source
# File lib/elastic/core/connector.rb, line 11
def index_name
  @index_name ||= "#{Elastic.config.index}_#{@name}"
end
migrate(batch_size: nil) click to toggle source
# File lib/elastic/core/connector.rb, line 45
def migrate(batch_size: nil)
  unless remap
    rollover do |new_index|
      copy_to new_index, batch_size: batch_size
    end
  end
  nil
end
query(query: nil) click to toggle source
# File lib/elastic/core/connector.rb, line 110
def query(query: nil)
  api.search(index: index_name, body: query)
end
refresh() click to toggle source
# File lib/elastic/core/connector.rb, line 98
def refresh
  api.indices.refresh index: index_name
end
remap() click to toggle source
# File lib/elastic/core/connector.rb, line 30
def remap
  case status
  when :not_available
    create_from_scratch
  when :not_synchronized
    begin
      setup_index_mapping resolve_actual_index_name
    rescue Elasticsearch::Transport::Transport::Errors::BadRequest
      return false
    end
  end

  true
end
remove_orphaned_indices() click to toggle source
# File lib/elastic/core/connector.rb, line 159
def remove_orphaned_indices
  _, rolling_index = resolve_write_indices

  unless rolling_index.nil?
    Elastic.logger.info "Removing orphan index #{rolling_index}"
    api.indices.delete index: rolling_index
  end
end
rollover(&_block) click to toggle source
# File lib/elastic/core/connector.rb, line 114
def rollover(&_block) # rubocop:disable Metrics/MethodLength
  actual_index, rolling_index = resolve_write_indices

  unless rolling_index.nil?
    raise Elastic::RolloverError, 'rollover process already in progress'
  end

  new_index = create_index_w_mapping

  begin
    transfer_alias(write_index_alias, to: new_index)
    wait_for_index_to_stabilize
    perform_optimized_write_on(new_index, &_block)
    transfer_alias(index_name, from: actual_index, to: new_index)
    transfer_alias(write_index_alias, from: actual_index)
    wait_for_index_to_stabilize
    api.indices.delete index: actual_index
  rescue StandardError
    api.indices.delete index: new_index
    raise
  end
end
status() click to toggle source
# File lib/elastic/core/connector.rb, line 15
def status
  return :ready if Elastic.config.disable_indexing

  actual_name = resolve_actual_index_name
  return :not_available if actual_name.nil?
  return :not_synchronized unless mapping_synchronized? actual_name

  :ready
end

Private Instance Methods

api() click to toggle source
# File lib/elastic/core/connector.rb, line 177
def api
  Elastic.config.api_client
end
configure_index(_index, _settings) click to toggle source
# File lib/elastic/core/connector.rb, line 269
def configure_index(_index, _settings)
  api.indices.put_settings index: _index, body: { index: _settings }
end
create_from_scratch() click to toggle source
# File lib/elastic/core/connector.rb, line 233
def create_from_scratch
  new_index = create_index_w_mapping
  api.indices.update_aliases(
    body: {
      actions: [
        { add: { index: new_index, alias: index_name } },
        { add: { index: new_index, alias: write_index_alias } }
      ]
    }
  )
end
create_index_w_mapping() click to toggle source
# File lib/elastic/core/connector.rb, line 225
def create_index_w_mapping
  new_name = "#{index_name}:#{Time.now.to_i}"
  api.indices.create index: new_name
  api.cluster.health wait_for_status: 'yellow'
  setup_index_mapping new_name
  new_name
end
default_batch_size() click to toggle source
# File lib/elastic/core/connector.rb, line 283
def default_batch_size
  1_000
end
mapping_synchronized?(_index) click to toggle source
# File lib/elastic/core/connector.rb, line 245
def mapping_synchronized?(_index)
  type_mappings = api.indices.get_mapping(index: _index, include_type_name: false)
  return false if type_mappings[_index].nil?
  return false if type_mappings[_index]['mappings'].empty?

  diff = Elastic::Commands::CompareMappings.for(
    current: type_mappings[_index]['mappings'],
    user: @mapping
  )

  diff.empty?
end
perform_optimized_write_on(_index) { |_index| ... } click to toggle source
# File lib/elastic/core/connector.rb, line 185
def perform_optimized_write_on(_index)
  old_indices = Thread.current[write_index_thread_override]
  Thread.current[write_index_thread_override] = [_index]
  configure_index(_index, refresh_interval: -1)
  yield _index
ensure
  configure_index(_index, refresh_interval: '1s')
  Thread.current[write_index_thread_override] = old_indices
end
resolve_actual_index_name() click to toggle source
# File lib/elastic/core/connector.rb, line 218
def resolve_actual_index_name
  result = api.indices.get_alias(name: index_name)
  result.keys.first
rescue Elasticsearch::Transport::Transport::Errors::NotFound
  nil
end
resolve_write_indices() click to toggle source
# File lib/elastic/core/connector.rb, line 203
def resolve_write_indices
  @write_indices = nil if Elastic.config.disable_index_name_caching || write_indices_expired?
  @write_indices ||= begin
    result = api.indices.get_alias(name: write_index_alias)
    @write_indices_expiration = @settling_time.from_now
    result.keys.sort # lower timestamp first (actual)
  rescue Elasticsearch::Transport::Transport::Errors::NotFound
    raise Elastic::MissingIndexError, 'index does not exist, call migrate first'
  end
end
retry_on_temporary_error(_action, retries: 3) { || ... } click to toggle source
# File lib/elastic/core/connector.rb, line 287
def retry_on_temporary_error(_action, retries: 3)
  yield
rescue Elasticsearch::Transport::Transport::Errors::ServiceUnavailable,
       Elasticsearch::Transport::Transport::Errors::GatewayTimeout => exc
  raise if retries <= 0

  Elastic.logger.warn("#{exc.class} error during '#{_action}', retrying!")
  retries -= 1
  retry
end
setup_index_mapping(_index) click to toggle source
# File lib/elastic/core/connector.rb, line 258
def setup_index_mapping(_index)
  api.indices.put_mapping(index: _index, type: DEFAULT_TYPE, body: @mapping)
end
transfer_alias(_alias, from: nil, to: nil) click to toggle source
# File lib/elastic/core/connector.rb, line 262
def transfer_alias(_alias, from: nil, to: nil)
  actions = []
  actions << { remove: { index: from, alias: _alias } } if from
  actions << { add: { index: to, alias: _alias } } if to
  api.indices.update_aliases body: { actions: actions }
end
transform_hit_to_create(_hit) click to toggle source
# File lib/elastic/core/connector.rb, line 273
def transform_hit_to_create(_hit)
  {
    'create' => {
      '_id' => _hit['_id'],
      '_type' => DEFAULT_TYPE,
      'data' => _hit['_source']
    }
  }
end
wait_for_index_to_stabilize() click to toggle source
# File lib/elastic/core/connector.rb, line 170
def wait_for_index_to_stabilize
  return if @settling_time.zero?

  Elastic.logger.info "Waiting #{@settling_time * 1.2}s for write indices to catch up ..."
  sleep(@settling_time * 1.2)
end
write_index_alias() click to toggle source
# File lib/elastic/core/connector.rb, line 199
def write_index_alias
  @write_index_alias ||= "#{index_name}.w"
end
write_index_thread_override() click to toggle source
# File lib/elastic/core/connector.rb, line 195
def write_index_thread_override
  "_elastic_#{index_name}_write_index"
end
write_indices() click to toggle source
# File lib/elastic/core/connector.rb, line 181
def write_indices
  Thread.current[write_index_thread_override] || resolve_write_indices
end
write_indices_expired?() click to toggle source
# File lib/elastic/core/connector.rb, line 214
def write_indices_expired?
  @write_indices_expiration && @write_indices_expiration < Time.current
end