module Sequel::Plugins::Elasticsearch::ClassMethods
The class methods that will be added to the Sequel::Model
Attributes
The Elasticsearch
index to which the documents will be written.
The extra options that will be passed to the Elasticsearch
client.
The Elasticsearch
type to which the documents will be written.
Public Instance Methods
Remove previous aliases and point the `elasticsearch_index` to the new index.
# File lib/sequel/plugins/elasticsearch.rb, line 129 def alias_index(new_index) es_client.indices.update_aliases body: { actions: [ { remove: { index: "#{elasticsearch_index}*", alias: elasticsearch_index } }, { add: { index: new_index, alias: elasticsearch_index } } ] } end
Wrapper method in which error handling is done for Elasticsearch
calls.
# File lib/sequel/plugins/elasticsearch.rb, line 71 def call_es yield rescue ::Elasticsearch::Transport::Transport::Errors::NotFound, ::Elasticsearch::Transport::Transport::Error, Faraday::ConnectionFailed => e db.loggers.first.warn e if db.loggers.count.positive? nil end
Execute a search or a scroll on the Model's Elasticsearch
index. This method is “safe” in that it will catch the more common Errors.
# File lib/sequel/plugins/elasticsearch.rb, line 66 def es(query = '', opts = {}) call_es { query.is_a?(Result) ? scroll!(query, opts) : es!(query, opts) } end
Execute a search on the Model's Elasticsearch
index without catching Errors.
# File lib/sequel/plugins/elasticsearch.rb, line 47 def es!(query = '', opts = {}) opts = { index: elasticsearch_index, type: elasticsearch_type }.merge(opts) query.is_a?(String) ? opts[:q] = query : opts[:body] = query Result.new es_client.search(opts), self end
Return the Elasticsearch
client used to communicate with the cluster.
# File lib/sequel/plugins/elasticsearch.rb, line 42 def es_client @es_client = ::Elasticsearch::Client.new elasticsearch_opts end
Import the whole dataset into Elasticsearch
.
This assumes that a template that covers all the possible index names have been created. See timestamped_index
for examples of the indices that will be created.
This adds or updates records to the last index created by this utility. Use the reindex!
method to create a completely new index and alias.
# File lib/sequel/plugins/elasticsearch.rb, line 88 def import!(index: nil, dataset: nil, batch_size: 100) dataset ||= self.dataset index_name = index || last_index || elasticsearch_index # Index all the documents body = [] dataset.each_page(batch_size) do |ds| body = [] ds.all.each do |row| print '.' body << { update: import_object(index_name, row) } end puts '/' es_client.bulk body: body body = nil end end
# File lib/sequel/plugins/elasticsearch.rb, line 106 def import_object(idx, row) val = { _index: idx, _id: row.document_id, data: { doc: row.as_indexed_json, doc_as_upsert: true } } val[:_type] = elasticsearch_type if elasticsearch_type val end
Find the last created index that matches the specified index name.
# File lib/sequel/plugins/elasticsearch.rb, line 139 def last_index es_client.indices.get_alias(name: elasticsearch_index)&.keys&.sort&.first rescue ::Elasticsearch::Transport::Transport::Errors::NotFound nil end
Creates a new index in Elasticsearch
from the specified dataset, as well as an alias to the new index.
See the documentation on import!
for more details.
# File lib/sequel/plugins/elasticsearch.rb, line 120 def reindex!(index: nil, dataset: nil, batch_size: 100) index_name = index || timestamped_index import!(index: index_name, dataset: dataset, batch_size: batch_size) # Create an alias to the newly created index alias_index(index_name) end
Fetch the next page in a scroll without catching Errors.
# File lib/sequel/plugins/elasticsearch.rb, line 57 def scroll!(scroll_id, duration) scroll_id = scroll_id.scroll_id if scroll_id.is_a? Result return nil unless scroll_id Result.new es_client.scroll(body: scroll_id, scroll: duration), self end
Generate a timestamped index name. This will use the current timestamp to construct index names like this:
base-name-20191004.123456
# File lib/sequel/plugins/elasticsearch.rb, line 149 def timestamped_index time_str = Time.now.strftime('%Y%m%d.%H%M%S') # TODO: Make the format configurable "#{elasticsearch_index}-#{time_str}".to_sym end