class Elastic::Core::Connector
Constants
- DEFAULT_TYPE
Public Class Methods
new(_name, _mapping, settling_time: 10.seconds)
click to toggle source
# File lib/elastic/core/connector.rb, line 5 def initialize(_name, _mapping, settling_time: 10.seconds) @name = _name @mapping = _mapping @settling_time = settling_time end
Public Instance Methods
bulk_index(_documents)
click to toggle source
# File lib/elastic/core/connector.rb, line 65 def bulk_index(_documents) return if Elastic.config.disable_indexing # TODO: validate documents type body = _documents.map { |doc| { 'index' => doc.merge('_type' => DEFAULT_TYPE) } } write_indices.each do |write_index| retry_on_temporary_error('bulk indexing') do api.bulk(index: write_index, body: body) end end end
copy_to(_to, batch_size: nil)
click to toggle source
# File lib/elastic/core/connector.rb, line 137 def copy_to(_to, batch_size: nil) # rubocop:disable Metrics/AbcSize api.indices.refresh index: index_name r = api.search( index: index_name, body: { sort: ['_doc'] }, scroll: '5m', size: batch_size || default_batch_size ) count = 0 while !r['hits']['hits'].empty? count += r['hits']['hits'].count Elastic.logger.info "Copied #{count} docs" body = r['hits']['hits'].map { |h| transform_hit_to_create(h) } api.bulk(index: _to, body: body) r = api.scroll scroll: '5m', scroll_id: r['_scroll_id'] end end
count(query: nil)
click to toggle source
# File lib/elastic/core/connector.rb, line 106 def count(query: nil) api.count(index: index_name, body: query)['count'] end
delete(_document)
click to toggle source
# File lib/elastic/core/connector.rb, line 78 def delete(_document) raise ArgumentError, 'document must provide an id' unless _document['_id'] return if Elastic.config.disable_indexing write_index, rolling_index = write_indices operations = [{ 'delete' => _document.merge('_index' => write_index, '_type' => DEFAULT_TYPE) }] if rolling_index operations << { 'delete' => _document.merge('_index' => rolling_index, '_type' => DEFAULT_TYPE) } end api.bulk(body: operations) end
drop()
click to toggle source
# File lib/elastic/core/connector.rb, line 25 def drop api.indices.delete index: "#{index_name}:*" nil end
find(_id)
click to toggle source
# File lib/elastic/core/connector.rb, line 102 def find(_id) api.get(index: index_name, id: _id) end
index(_document)
click to toggle source
# File lib/elastic/core/connector.rb, line 54 def index(_document) return if Elastic.config.disable_indexing # TODO: validate document type operations = write_indices.map do |write_index| { 'index' => _document.merge('_index' => write_index, '_type' => DEFAULT_TYPE) } end api.bulk(body: operations) end
index_name()
click to toggle source
# File lib/elastic/core/connector.rb, line 11 def index_name @index_name ||= "#{Elastic.config.index}_#{@name}" end
migrate(batch_size: nil)
click to toggle source
# File lib/elastic/core/connector.rb, line 45 def migrate(batch_size: nil) unless remap rollover do |new_index| copy_to new_index, batch_size: batch_size end end nil end
query(query: nil)
click to toggle source
# File lib/elastic/core/connector.rb, line 110 def query(query: nil) api.search(index: index_name, body: query) end
refresh()
click to toggle source
# File lib/elastic/core/connector.rb, line 98 def refresh api.indices.refresh index: index_name end
remap()
click to toggle source
# File lib/elastic/core/connector.rb, line 30 def remap case status when :not_available create_from_scratch when :not_synchronized begin setup_index_mapping resolve_actual_index_name rescue Elasticsearch::Transport::Transport::Errors::BadRequest return false end end true end
remove_orphaned_indices()
click to toggle source
# File lib/elastic/core/connector.rb, line 159 def remove_orphaned_indices _, rolling_index = resolve_write_indices unless rolling_index.nil? Elastic.logger.info "Removing orphan index #{rolling_index}" api.indices.delete index: rolling_index end end
rollover(&_block)
click to toggle source
# File lib/elastic/core/connector.rb, line 114 def rollover(&_block) # rubocop:disable Metrics/MethodLength actual_index, rolling_index = resolve_write_indices unless rolling_index.nil? raise Elastic::RolloverError, 'rollover process already in progress' end new_index = create_index_w_mapping begin transfer_alias(write_index_alias, to: new_index) wait_for_index_to_stabilize perform_optimized_write_on(new_index, &_block) transfer_alias(index_name, from: actual_index, to: new_index) transfer_alias(write_index_alias, from: actual_index) wait_for_index_to_stabilize api.indices.delete index: actual_index rescue StandardError api.indices.delete index: new_index raise end end
status()
click to toggle source
# File lib/elastic/core/connector.rb, line 15 def status return :ready if Elastic.config.disable_indexing actual_name = resolve_actual_index_name return :not_available if actual_name.nil? return :not_synchronized unless mapping_synchronized? actual_name :ready end
Private Instance Methods
api()
click to toggle source
# File lib/elastic/core/connector.rb, line 177 def api Elastic.config.api_client end
configure_index(_index, _settings)
click to toggle source
# File lib/elastic/core/connector.rb, line 269 def configure_index(_index, _settings) api.indices.put_settings index: _index, body: { index: _settings } end
create_from_scratch()
click to toggle source
# File lib/elastic/core/connector.rb, line 233 def create_from_scratch new_index = create_index_w_mapping api.indices.update_aliases( body: { actions: [ { add: { index: new_index, alias: index_name } }, { add: { index: new_index, alias: write_index_alias } } ] } ) end
create_index_w_mapping()
click to toggle source
# File lib/elastic/core/connector.rb, line 225 def create_index_w_mapping new_name = "#{index_name}:#{Time.now.to_i}" api.indices.create index: new_name api.cluster.health wait_for_status: 'yellow' setup_index_mapping new_name new_name end
default_batch_size()
click to toggle source
# File lib/elastic/core/connector.rb, line 283 def default_batch_size 1_000 end
mapping_synchronized?(_index)
click to toggle source
# File lib/elastic/core/connector.rb, line 245 def mapping_synchronized?(_index) type_mappings = api.indices.get_mapping(index: _index, include_type_name: false) return false if type_mappings[_index].nil? return false if type_mappings[_index]['mappings'].empty? diff = Elastic::Commands::CompareMappings.for( current: type_mappings[_index]['mappings'], user: @mapping ) diff.empty? end
perform_optimized_write_on(_index) { |_index| ... }
click to toggle source
# File lib/elastic/core/connector.rb, line 185 def perform_optimized_write_on(_index) old_indices = Thread.current[write_index_thread_override] Thread.current[write_index_thread_override] = [_index] configure_index(_index, refresh_interval: -1) yield _index ensure configure_index(_index, refresh_interval: '1s') Thread.current[write_index_thread_override] = old_indices end
resolve_actual_index_name()
click to toggle source
# File lib/elastic/core/connector.rb, line 218 def resolve_actual_index_name result = api.indices.get_alias(name: index_name) result.keys.first rescue Elasticsearch::Transport::Transport::Errors::NotFound nil end
resolve_write_indices()
click to toggle source
# File lib/elastic/core/connector.rb, line 203 def resolve_write_indices @write_indices = nil if Elastic.config.disable_index_name_caching || write_indices_expired? @write_indices ||= begin result = api.indices.get_alias(name: write_index_alias) @write_indices_expiration = @settling_time.from_now result.keys.sort # lower timestamp first (actual) rescue Elasticsearch::Transport::Transport::Errors::NotFound raise Elastic::MissingIndexError, 'index does not exist, call migrate first' end end
retry_on_temporary_error(_action, retries: 3) { || ... }
click to toggle source
# File lib/elastic/core/connector.rb, line 287 def retry_on_temporary_error(_action, retries: 3) yield rescue Elasticsearch::Transport::Transport::Errors::ServiceUnavailable, Elasticsearch::Transport::Transport::Errors::GatewayTimeout => exc raise if retries <= 0 Elastic.logger.warn("#{exc.class} error during '#{_action}', retrying!") retries -= 1 retry end
setup_index_mapping(_index)
click to toggle source
# File lib/elastic/core/connector.rb, line 258 def setup_index_mapping(_index) api.indices.put_mapping(index: _index, type: DEFAULT_TYPE, body: @mapping) end
transfer_alias(_alias, from: nil, to: nil)
click to toggle source
# File lib/elastic/core/connector.rb, line 262 def transfer_alias(_alias, from: nil, to: nil) actions = [] actions << { remove: { index: from, alias: _alias } } if from actions << { add: { index: to, alias: _alias } } if to api.indices.update_aliases body: { actions: actions } end
transform_hit_to_create(_hit)
click to toggle source
# File lib/elastic/core/connector.rb, line 273 def transform_hit_to_create(_hit) { 'create' => { '_id' => _hit['_id'], '_type' => DEFAULT_TYPE, 'data' => _hit['_source'] } } end
wait_for_index_to_stabilize()
click to toggle source
# File lib/elastic/core/connector.rb, line 170 def wait_for_index_to_stabilize return if @settling_time.zero? Elastic.logger.info "Waiting #{@settling_time * 1.2}s for write indices to catch up ..." sleep(@settling_time * 1.2) end
write_index_alias()
click to toggle source
# File lib/elastic/core/connector.rb, line 199 def write_index_alias @write_index_alias ||= "#{index_name}.w" end
write_index_thread_override()
click to toggle source
# File lib/elastic/core/connector.rb, line 195 def write_index_thread_override "_elastic_#{index_name}_write_index" end
write_indices()
click to toggle source
# File lib/elastic/core/connector.rb, line 181 def write_indices Thread.current[write_index_thread_override] || resolve_write_indices end
write_indices_expired?()
click to toggle source
# File lib/elastic/core/connector.rb, line 214 def write_indices_expired? @write_indices_expiration && @write_indices_expiration < Time.current end