module ElasticQueue::Persistence::ClassMethods

Public Instance Methods

add_mappings() click to toggle source
# File lib/elastic_queue/persistence.rb, line 70
def add_mappings
  model_classes.each do |klass|
    mapping = klass.queue_mapping
    search_client.indices.put_mapping index: index_name, type: klass.to_s.underscore, body: mapping if mapping.present?
  end
end
bulk_index(scopes: {}, batch_size: 10_000) click to toggle source

you can pass scopes into bulk_index to be used when fetching records

bulk_index(scopes: { some_model: [:scope1, :scope2], some_other_model: [:scope3] }) will fetch SomeModel.scope1.scope2 and SomeOtherModel.scope3 and index only those records.
# File lib/elastic_queue/persistence.rb, line 31
def bulk_index(scopes: {}, batch_size: 10_000)
  create_index unless index_exists?
  model_classes.each do |klass|
    # modelclass(model).includes(associations_for_index(model)).
    index_type = klass.to_s.underscore
    scoped_class(klass, scopes).find_in_batches(batch_size: batch_size) do |batch|
      body = []
      batch.each do |instance|
        body << { index: { _index: index_name, _id: instance.id, _type: index_type, data: instance.indexed_for_queue } }
      end
      search_client.bulk body: body
    end
  end
end
create_index() click to toggle source
# File lib/elastic_queue/persistence.rb, line 15
def create_index
  search_client.indices.create index: index_name, body: default_index_settings
  add_mappings
end
default_index_settings() click to toggle source
# File lib/elastic_queue/persistence.rb, line 54
def default_index_settings
  {
    settings: {
      analysis: {
        analyzer: {
          default: {
            type: :custom,
            tokenizer: :whitespace,
            filter: [:lowercase]
          }
        }
      }
    }
  }
end
delete_index() click to toggle source
# File lib/elastic_queue/persistence.rb, line 20
def delete_index
  search_client.indices.delete index: index_name
end
index_exists?() click to toggle source
# File lib/elastic_queue/persistence.rb, line 6
def index_exists?
  search_client.indices.exists index: index_name
end
index_model(instance) click to toggle source

TODO: move these to an instance?

# File lib/elastic_queue/persistence.rb, line 78
def index_model(instance)
  search_client.index index: index_name, id: instance.id, type: instance.class.to_s.underscore, body: instance.indexed_for_queue
end
refresh_index() click to toggle source

not using it, but it is nice for debugging

# File lib/elastic_queue/persistence.rb, line 25
def refresh_index
  search_client.indices.refresh index: index_name
end
remove_model(instance) click to toggle source
# File lib/elastic_queue/persistence.rb, line 87
def remove_model(instance)
  begin
    search_client.delete index: index_name, id: instance.id, type: instance.class.to_s.underscore
  rescue Elasticsearch::Transport::Transport::Errors::NotFound
    # just say you deleted it if it's not there!
  end
end
reset_index() click to toggle source
# File lib/elastic_queue/persistence.rb, line 10
def reset_index
  delete_index if index_exists?
  create_index
end
scoped_class(klass, scopes) click to toggle source
# File lib/elastic_queue/persistence.rb, line 46
def scoped_class(klass, scopes)
  return klass unless scopes[klass.to_s.underscore.to_sym]
  scopes[klass.to_s.underscore.to_sym].each do |scope|
    klass = klass.send(scope)
  end
  klass
end
upsert_model(instance) click to toggle source
# File lib/elastic_queue/persistence.rb, line 82
def upsert_model(instance)
  body = { doc: instance.indexed_for_queue, doc_as_upsert: true }
  search_client.update index: index_name, id: instance.id, type: instance.class.to_s.underscore, body: body, refresh: true, retry_on_conflict: 20
end