class Polipus::Storage::ElasticSearchStore

Constants

BINARY_FIELDS
DEFAULT_INDEX

Attributes

compress[RW]
except[RW]
index[RW]
index_name[RW]
refresh[RW]
semaphore[RW]

Public Class Methods

new(client, options = {}) click to toggle source
# File lib/polipus-elasticsearch/storage/elasticsearch_store.rb, line 15
def initialize(client, options = {})
  @index = options[:index] || options['index'] || DEFAULT_INDEX
  @index_name = options[:index_name] || options['index_name']
  @except = options[:except] || options['except'] || []
  @compress = options[:compress] || options['compress']
  @semaphore = Mutex.new
  @refresh = options[:refresh] || options['refresh'] || true
  index.setup(client, index_name)
  index.create_index!(index_name) unless index.index_exists?
end

Public Instance Methods

add(page) click to toggle source
# File lib/polipus-elasticsearch/storage/elasticsearch_store.rb, line 26
def add(page)
  semaphore.synchronize do
    obj = page.to_hash
    Array(except).each { |field| obj.delete(field.to_s) }
    BINARY_FIELDS.each do |field|
      next if obj[field.to_s].nil? || obj[field.to_s].empty?
      obj[field.to_s] = MultiJson.encode(obj[field.to_s]) if field.to_s == 'user_data'
      obj[field.to_s] = Base64.encode64(obj[field.to_s])
    end
    obj['id'] = uuid(page)
    obj['fetched_at'] = obj['fetched_at'].to_i
    begin
      index.store(obj, refresh)
    rescue Elasticsearch::Transport::Transport::Errors::Conflict => ex
      # you're trying to store an old version.
    end
  end
end
clear() click to toggle source
# File lib/polipus-elasticsearch/storage/elasticsearch_store.rb, line 45
def clear
  index.clear_index! if index.index_exists?
end
count() click to toggle source
# File lib/polipus-elasticsearch/storage/elasticsearch_store.rb, line 49
def count
  index.count
end
drop() click to toggle source
# File lib/polipus-elasticsearch/storage/elasticsearch_store.rb, line 53
def drop
  index.delete_index! if index.index_exists?
end
each() { |uuid(page), page| ... } click to toggle source
# File lib/polipus-elasticsearch/storage/elasticsearch_store.rb, line 57
def each
  # This method is implemented only for testing purposes
  response = index.client.search(
    index: index_name,
    body: {
      query: { match_all: {} },
      from: 0,
      size: 25
    }
  )
  response['hits']['hits'].each do |data|
    page = load_page(data['_source'])
    yield uuid(page), page
  end
end
exists?(page) click to toggle source
# File lib/polipus-elasticsearch/storage/elasticsearch_store.rb, line 73
def exists?(page)
  @semaphore.synchronize do
    index.exists?(uuid(page))
  end
end
get(page) click to toggle source
# File lib/polipus-elasticsearch/storage/elasticsearch_store.rb, line 79
def get(page)
  @semaphore.synchronize do
    load_page(index.get(uuid(page)))
  end
end
load_page(data) click to toggle source
# File lib/polipus-elasticsearch/storage/elasticsearch_store.rb, line 91
def load_page(data)
  return nil if data.nil?
  BINARY_FIELDS.each do |field|
    next if data[field.to_s].nil? || data[field.to_s].empty?
    data[field.to_s] = Base64.decode64(data[field.to_s])
    data[field.to_s] = MultiJson.decode(data[field.to_s]) if field.to_s == 'user_data'
  end
  page = Page.from_hash(data)
  page.fetched_at ||= 0
  page
end
remove(page) click to toggle source
# File lib/polipus-elasticsearch/storage/elasticsearch_store.rb, line 85
def remove(page)
  @semaphore.synchronize do
    index.remove(uuid(page), refresh)
  end
end