class Searchkick::Index
Constants
- EXCLUDED_ATTRIBUTES
Attributes
name[R]
options[R]
Public Class Methods
new(name, options = {})
click to toggle source
# File lib/searchkick/index.rb, line 7 def initialize(name, options = {}) @name = name @options = options @klass_document_type = {} # cache end
Public Instance Methods
alias_exists?()
click to toggle source
# File lib/searchkick/index.rb, line 35 def alias_exists? client.indices.exists_alias name: name end
all_indices(unaliased: false)
click to toggle source
# File lib/searchkick/index.rb, line 188 def all_indices(unaliased: false) indices = begin client.indices.get_aliases rescue Elasticsearch::Transport::Transport::Errors::NotFound {} end indices = indices.select { |_k, v| v.empty? || v["aliases"].empty? } if unaliased indices.select { |k, _v| k =~ /\A#{Regexp.escape(name)}_\d{14,17}\z/ }.keys end
batches_left()
click to toggle source
# File lib/searchkick/index.rb, line 311 def batches_left Searchkick.with_redis { |r| r.scard(batches_key) } end
bulk_delete(records)
click to toggle source
# File lib/searchkick/index.rb, line 89 def bulk_delete(records) bulk_delete_helper(records) end
bulk_index(records)
click to toggle source
# File lib/searchkick/index.rb, line 93 def bulk_index(records) bulk_index_helper(records) end
Also aliased as: import
bulk_update(records, method_name)
click to toggle source
# File lib/searchkick/index.rb, line 98 def bulk_update(records, method_name) bulk_update_helper(records, method_name) end
clean_indices()
click to toggle source
remove old indices that start w/ index_name
# File lib/searchkick/index.rb, line 200 def clean_indices indices = all_indices(unaliased: true) indices.each do |index| Searchkick::Index.new(index).delete end indices end
create(body = {})
click to toggle source
# File lib/searchkick/index.rb, line 13 def create(body = {}) client.indices.create index: name, body: body end
create_index(index_options: nil)
click to toggle source
reindex
# File lib/searchkick/index.rb, line 181 def create_index(index_options: nil) index_options ||= self.index_options index = Searchkick::Index.new("#{name}_#{Time.now.strftime('%Y%m%d%H%M%S%L')}", @options) index.create(index_options) index end
delete()
click to toggle source
# File lib/searchkick/index.rb, line 17 def delete if !Searchkick.server_below?("6.0.0-alpha1") && alias_exists? # can't call delete directly on aliases in ES 6 indices = client.indices.get_alias(name: name).keys client.indices.delete index: indices else client.indices.delete index: name end end
exists?()
click to toggle source
# File lib/searchkick/index.rb, line 27 def exists? client.indices.exists index: name end
import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false)
click to toggle source
# File lib/searchkick/index.rb, line 281 def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false) # use scope for import scope = scope.search_import if scope.respond_to?(:search_import) if batch import_or_update scope.to_a, method_name, async Searchkick.with_redis { |r| r.srem(batches_key, batch_id) } if batch_id elsif full && async full_reindex_async(scope) elsif scope.respond_to?(:find_in_batches) if resume # use total docs instead of max id since there's not a great way # to get the max _id without scripting since it's a string # TODO use primary key and prefix with table name scope = scope.where("id > ?", total_docs) end scope = scope.select("id").except(:includes, :preload) if async scope.find_in_batches batch_size: batch_size do |batch| import_or_update batch, method_name, async end else each_batch(scope) do |items| import_or_update items, method_name, async end end end
klass_document_type(klass)
click to toggle source
# File lib/searchkick/index.rb, line 321 def klass_document_type(klass) @klass_document_type[klass] ||= begin if klass.respond_to?(:document_type) klass.document_type else klass.model_name.to_s.underscore end end end
mapping()
click to toggle source
# File lib/searchkick/index.rb, line 39 def mapping client.indices.get_mapping index: name end
promote(new_name, update_refresh_interval: false)
click to toggle source
# File lib/searchkick/index.rb, line 55 def promote(new_name, update_refresh_interval: false) if update_refresh_interval new_index = Searchkick::Index.new(new_name) settings = options[:settings] || {} refresh_interval = (settings[:index] && settings[:index][:refresh_interval]) || "1s" new_index.update_settings(index: {refresh_interval: refresh_interval}) end old_indices = begin client.indices.get_alias(name: name).keys rescue Elasticsearch::Transport::Transport::Errors::NotFound {} end actions = old_indices.map { |old_name| {remove: {index: old_name, alias: name}} } + [{add: {index: new_name, alias: name}}] client.indices.update_aliases body: {actions: actions} end
Also aliased as: swap
record_data(r)
click to toggle source
# File lib/searchkick/index.rb, line 102 def record_data(r) data = { _index: name, _id: search_id(r), _type: document_type(r) } data[:_routing] = r.search_routing if r.respond_to?(:search_routing) data end
refresh()
click to toggle source
# File lib/searchkick/index.rb, line 31 def refresh client.indices.refresh index: name end
refresh_interval()
click to toggle source
# File lib/searchkick/index.rb, line 47 def refresh_interval settings.values.first["settings"]["index"]["refresh_interval"] end
reindex_queue()
click to toggle source
queue
# File lib/searchkick/index.rb, line 162 def reindex_queue Searchkick::ReindexQueue.new(name) end
reindex_record(record)
click to toggle source
# File lib/searchkick/index.rb, line 120 def reindex_record(record) if record.destroyed? || !record.should_index? begin remove(record) rescue Elasticsearch::Transport::Transport::Errors::NotFound # do nothing end else store(record) end end
reindex_record_async(record)
click to toggle source
# File lib/searchkick/index.rb, line 132 def reindex_record_async(record) if Searchkick.callbacks_value.nil? if defined?(Searchkick::ReindexV2Job) Searchkick::ReindexV2Job.perform_later(record.class.name, record.id.to_s) else raise Searchkick::Error, "Active Job not found" end else reindex_record(record) end end
reindex_scope(scope, import: true, resume: false, retain: false, async: false, refresh_interval: nil)
click to toggle source
gist.github.com/jarosan/3124884 www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/
# File lib/searchkick/index.rb, line 223 def reindex_scope(scope, import: true, resume: false, retain: false, async: false, refresh_interval: nil) if resume index_name = all_indices.sort.last raise Searchkick::Error, "No index to resume" unless index_name index = Searchkick::Index.new(index_name) else clean_indices unless retain index_options = scope.searchkick_index_options index_options.deep_merge!(settings: {index: {refresh_interval: refresh_interval}}) if refresh_interval index = create_index(index_options: index_options) end # check if alias exists alias_exists = alias_exists? if alias_exists # import before promotion index.import_scope(scope, resume: resume, async: async, full: true) if import # get existing indices to remove unless async promote(index.name, update_refresh_interval: !refresh_interval.nil?) clean_indices unless retain end else delete if exists? promote(index.name, update_refresh_interval: !refresh_interval.nil?) # import after promotion index.import_scope(scope, resume: resume, async: async, full: true) if import end if async if async.is_a?(Hash) && async[:wait] puts "Created index: #{index.name}" puts "Jobs queued. Waiting..." loop do sleep 3 status = Searchkick.reindex_status(index.name) break if status[:completed] puts "Batches left: #{status[:batches_left]}" end # already promoted if alias didn't exist if alias_exists puts "Jobs complete. Promoting..." promote(index.name, update_refresh_interval: !refresh_interval.nil?) end clean_indices unless retain puts "SUCCESS!" end {index_name: index.name} else index.refresh true end end
remove(record)
click to toggle source
# File lib/searchkick/index.rb, line 81 def remove(record) bulk_delete_helper([record]) end
retrieve(record)
click to toggle source
# File lib/searchkick/index.rb, line 112 def retrieve(record) client.get( index: name, type: document_type(record), id: search_id(record) )["_source"] end
search_model(searchkick_klass, term = "*", **options) { |body| ... }
click to toggle source
TODO remove in next major version
# File lib/searchkick/index.rb, line 169 def search_model(searchkick_klass, term = "*", **options, &block) query = Searchkick::Query.new(searchkick_klass, term, options) yield(query.body) if block if options[:execute] == false query else query.execute end end
settings()
click to toggle source
# File lib/searchkick/index.rb, line 43 def settings client.indices.get_settings index: name end
similar_record(record, **options)
click to toggle source
# File lib/searchkick/index.rb, line 144 def similar_record(record, **options) like_text = retrieve(record).to_hash .keep_if { |k, _| !options[:fields] || options[:fields].map(&:to_s).include?(k) } .values.compact.join(" ") # TODO deep merge method options[:where] ||= {} options[:where][:_id] ||= {} options[:where][:_id][:not] = record.id.to_s options[:per_page] ||= 10 options[:similar] = true # TODO use index class instead of record class search_model(record.class, like_text, options) end
store(record)
click to toggle source
record based use helpers for notifications
# File lib/searchkick/index.rb, line 77 def store(record) bulk_index_helper([record]) end
tokens(text, options = {})
click to toggle source
other
# File lib/searchkick/index.rb, line 317 def tokens(text, options = {}) client.indices.analyze(body: {text: text}.merge(options), index: name)["tokens"].map { |t| t["token"] } end
total_docs()
click to toggle source
# File lib/searchkick/index.rb, line 208 def total_docs response = client.search( index: name, body: { query: {match_all: {}}, size: 0 } ) response["hits"]["total"] end
update_record(record, method_name)
click to toggle source
# File lib/searchkick/index.rb, line 85 def update_record(record, method_name) bulk_update_helper([record], method_name) end
update_settings(settings)
click to toggle source
# File lib/searchkick/index.rb, line 51 def update_settings(settings) client.indices.put_settings index: name, body: settings end
Protected Instance Methods
batch_size()
click to toggle source
# File lib/searchkick/index.rb, line 502 def batch_size @batch_size ||= @options[:batch_size] || 1000 end
batches_key()
click to toggle source
# File lib/searchkick/index.rb, line 532 def batches_key "searchkick:reindex:#{name}:batches" end
bulk_delete_helper(records)
click to toggle source
# File lib/searchkick/index.rb, line 524 def bulk_delete_helper(records) Searchkick.indexer.queue(records.reject { |r| r.id.blank? }.map { |r| {delete: record_data(r)} }) end
bulk_index_helper(records)
click to toggle source
# File lib/searchkick/index.rb, line 520 def bulk_index_helper(records) Searchkick.indexer.queue(records.map { |r| {index: record_data(r).merge(data: search_data(r))} }) end
bulk_reindex_job(scope, batch_id, options)
click to toggle source
# File lib/searchkick/index.rb, line 493 def bulk_reindex_job(scope, batch_id, options) Searchkick::BulkReindexJob.perform_later({ class_name: scope.model_name.name, index_name: name, batch_id: batch_id }.merge(options)) Searchkick.with_redis { |r| r.sadd(batches_key, batch_id) } end
bulk_update_helper(records, method_name)
click to toggle source
# File lib/searchkick/index.rb, line 528 def bulk_update_helper(records, method_name) Searchkick.indexer.queue(records.map { |r| {update: record_data(r).merge(data: {doc: search_data(r, method_name)})} }) end
cast_big_decimal(obj)
click to toggle source
change all BigDecimal values to floats due to github.com/rails/rails/issues/6033 possible loss of precision :/
# File lib/searchkick/index.rb, line 407 def cast_big_decimal(obj) case obj when BigDecimal obj.to_f when Hash obj.each do |k, v| obj[k] = cast_big_decimal(v) end when Enumerable obj.map do |v| cast_big_decimal(v) end else obj end end
client()
click to toggle source
# File lib/searchkick/index.rb, line 333 def client Searchkick.client end
document_type(record)
click to toggle source
# File lib/searchkick/index.rb, line 337 def document_type(record) if record.respond_to?(:search_document_type) record.search_document_type else klass_document_type(record.class) end end
each_batch(scope) { |items| ... }
click to toggle source
# File lib/searchkick/index.rb, line 479 def each_batch(scope) # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb # use cursor for Mongoid items = [] scope.all.each do |item| items << item if items.length == batch_size yield items items = [] end end yield items if items.any? end
full_reindex_async(scope)
click to toggle source
# File lib/searchkick/index.rb, line 444 def full_reindex_async(scope) if scope.respond_to?(:primary_key) # TODO expire Redis key primary_key = scope.primary_key starting_id = scope.minimum(primary_key) if starting_id.nil? # no records, do nothing elsif starting_id.is_a?(Numeric) max_id = scope.maximum(primary_key) batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil batches_count.times do |i| batch_id = i + 1 min_id = starting_id + (i * batch_size) bulk_reindex_job scope, batch_id, min_id: min_id, max_id: min_id + batch_size - 1 end else scope.find_in_batches(batch_size: batch_size).each_with_index do |batch, i| batch_id = i + 1 bulk_reindex_job scope, batch_id, record_ids: batch.map { |record| record.id.to_s } end end else batch_id = 1 # TODO remove any eager loading scope = scope.only(:_id) if scope.respond_to?(:only) each_batch(scope) do |items| bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s } batch_id += 1 end end end
import_or_update(records, method_name, async)
click to toggle source
# File lib/searchkick/index.rb, line 424 def import_or_update(records, method_name, async) if records.any? if async Searchkick::BulkReindexJob.perform_later( class_name: records.first.class.name, record_ids: records.map(&:id), index_name: name, method_name: method_name ? method_name.to_s : nil ) else records = records.select(&:should_index?) if records.any? with_retries do method_name ? bulk_update(records, method_name) : import(records) end end end end end
location_value(value)
click to toggle source
# File lib/searchkick/index.rb, line 394 def location_value(value) if value.is_a?(Array) value.map(&:to_f).reverse elsif value.is_a?(Hash) {lat: value[:lat].to_f, lon: value[:lon].to_f} else value end end
search_data(record, method_name = nil)
click to toggle source
# File lib/searchkick/index.rb, line 352 def search_data(record, method_name = nil) partial_reindex = !method_name.nil? options = record.class.searchkick_options # remove _id since search_id is used instead source = record.send(method_name || :search_data).each_with_object({}) { |(k, v), memo| memo[k.to_s] = v; memo }.except(*EXCLUDED_ATTRIBUTES) # conversions if options[:conversions] Array(options[:conversions]).map(&:to_s).each do |conversions_field| if source[conversions_field] source[conversions_field] = source[conversions_field].map { |k, v| {query: k, count: v} } end end end # hack to prevent generator field doesn't exist error if options[:suggest] options[:suggest].map(&:to_s).each do |field| source[field] = nil if !source[field] && !partial_reindex end end # locations if options[:locations] options[:locations].map(&:to_s).each do |field| if source[field] if !source[field].is_a?(Hash) && (source[field].first.is_a?(Array) || source[field].first.is_a?(Hash)) # multiple locations source[field] = source[field].map { |a| location_value(a) } else source[field] = location_value(source[field]) end end end end cast_big_decimal(source) source end
search_id(record)
click to toggle source
# File lib/searchkick/index.rb, line 345 def search_id(record) id = record.respond_to?(:search_document_id) ? record.search_document_id : record.id id.is_a?(Numeric) ? id : id.to_s end
with_retries() { || ... }
click to toggle source
# File lib/searchkick/index.rb, line 506 def with_retries retries = 0 begin yield rescue Faraday::ClientError => e if retries < 1 retries += 1 retry end raise e end end