class Forklift::Connection::Elasticsearch

Public Class Methods

new(config, forklift) click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 7
def initialize(config, forklift)
  @config = config
  @forklift = forklift
end

Public Instance Methods

config() click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 12
def config
  @config
end
connect() click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 20
def connect
  @client = ::Elasticsearch::Client.new(config)
end
delete_index(index) click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 75
def delete_index(index)
  forklift.logger.debug "    ELASTICSEARCH (delete index): #{index}"
  client.indices.delete({ index: index }) if client.indices.exists({ index: index })
end
disconnect() click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 24
def disconnect
  @client = nil
end
forklift() click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 16
def forklift
  @forklift
end
read(index, query, looping=true, from=0, size=forklift.config[:batch_size]) { |data| ... } click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 28
def read(index, query, looping=true, from=0, size=forklift.config[:batch_size])
  offset = 0
  loop_count = 0

  while (looping == true || loop_count == 0)
    data = []
    prepared_query = query
    prepared_query[:from] = from + offset
    prepared_query[:size] = size

    forklift.logger.debug "    ELASTICSEARCH: #{query.to_json}"
    results = client.search( { index: index, body: prepared_query } )
    results["hits"]["hits"].each do |hit|
      data << hit["_source"]
    end

    data.map{|l| l.symbolize_keys! }

    if block_given?
      yield data
    else
      return data
    end

    looping = false if results["hits"]["hits"].length == 0
    offset = offset + size
    loop_count = loop_count + 1
  end
end
write(data, index, update=false, type='forklift', primary_key=:id) click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 58
def write(data, index, update=false, type='forklift', primary_key=:id)
  data.map{|l| l.symbolize_keys! }

  data.each do |d|
    object = {
      index:  index,
      body:   d,
      type:   type,
    }
    object[:id] = d[primary_key] if ( !d[primary_key].nil? && update == true )

    forklift.logger.debug "    ELASTICSEARCH (store): #{object.to_json}"
    client.index object
  end
  client.indices.refresh({ index: index })
end