class Forklift::Connection::Elasticsearch
Public Class Methods
new(config, forklift)
click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 7 def initialize(config, forklift) @config = config @forklift = forklift end
Public Instance Methods
config()
click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 12 def config @config end
connect()
click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 20 def connect @client = ::Elasticsearch::Client.new(config) end
delete_index(index)
click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 75 def delete_index(index) forklift.logger.debug " ELASTICSEARCH (delete index): #{index}" client.indices.delete({ index: index }) if client.indices.exists({ index: index }) end
disconnect()
click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 24 def disconnect @client = nil end
forklift()
click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 16 def forklift @forklift end
read(index, query, looping=true, from=0, size=forklift.config[:batch_size]) { |data| ... }
click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 28 def read(index, query, looping=true, from=0, size=forklift.config[:batch_size]) offset = 0 loop_count = 0 while (looping == true || loop_count == 0) data = [] prepared_query = query prepared_query[:from] = from + offset prepared_query[:size] = size forklift.logger.debug " ELASTICSEARCH: #{query.to_json}" results = client.search( { index: index, body: prepared_query } ) results["hits"]["hits"].each do |hit| data << hit["_source"] end data.map{|l| l.symbolize_keys! } if block_given? yield data else return data end looping = false if results["hits"]["hits"].length == 0 offset = offset + size loop_count = loop_count + 1 end end
write(data, index, update=false, type='forklift', primary_key=:id)
click to toggle source
# File lib/forklift/transports/elasticsearch.rb, line 58 def write(data, index, update=false, type='forklift', primary_key=:id) data.map{|l| l.symbolize_keys! } data.each do |d| object = { index: index, body: d, type: type, } object[:id] = d[primary_key] if ( !d[primary_key].nil? && update == true ) forklift.logger.debug " ELASTICSEARCH (store): #{object.to_json}" client.index object end client.indices.refresh({ index: index }) end