class Elastictastic::BulkPersistenceStrategy
Constants
- DEFAULT_HANDLER
- Operation
Public Class Methods
new(options)
click to toggle source
# File lib/elastictastic/bulk_persistence_strategy.rb, line 8 def initialize(options) @operations = [] @operations_by_id = {} @auto_flush = options.delete(:auto_flush) end
Public Instance Methods
create(instance, params = {}, &block)
click to toggle source
# File lib/elastictastic/bulk_persistence_strategy.rb, line 14 def create(instance, params = {}, &block) block ||= DEFAULT_HANDLER if instance.pending_save? raise Elastictastic::OperationNotAllowed, "Can't re-save transient document with pending save in bulk operation" end instance.pending_save! add( instance.index, instance.id, { 'create' => bulk_identifier_for_instance(instance) }, instance.elasticsearch_doc ) do |response| if response['create']['error'] block.call(ServerError[response['create']['error']]) else instance.id = response['create']['_id'] instance.version = response['create']['_version'] instance.persisted! block.call end end end
destroy(instance, &block)
click to toggle source
# File lib/elastictastic/bulk_persistence_strategy.rb, line 56 def destroy(instance, &block) block ||= DEFAULT_HANDLER instance.pending_destroy! add(instance.index, instance.id, :delete => bulk_identifier_for_instance(instance)) do |response| if response['delete']['error'] block.call(ServerError[response['delete']['error']]) else instance.transient! instance.version = response['delete']['_version'] block.call end end end
destroy!(index, type, id, routing, parent)
click to toggle source
# File lib/elastictastic/bulk_persistence_strategy.rb, line 70 def destroy!(index, type, id, routing, parent) add( index, id, :delete => bulk_identifier(index, type, id, routing, parent, nil) ) end
flush()
click to toggle source
# File lib/elastictastic/bulk_persistence_strategy.rb, line 77 def flush return if @operations.empty? params = {} params[:refresh] = true if Elastictastic.config.auto_refresh io = StringIO.new operations = @operations.reject { |operation| operation.skip } @operations.clear operations.each do |operation| operation.commands.each do |command| io.puts Elastictastic.json_encode(command) end end response = Elastictastic.client.bulk(io.string, params) response['items'].each_with_index do |op_response, i| operation = operations[i] operation.handler.call(op_response) if operation.handler end response end
update(instance, &block)
click to toggle source
# File lib/elastictastic/bulk_persistence_strategy.rb, line 38 def update(instance, &block) block ||= DEFAULT_HANDLER instance.pending_save! add( instance.index, instance.id, { 'index' => bulk_identifier_for_instance(instance) }, instance.elasticsearch_doc ) do |response| if response['index']['error'] block.call(ServerError[response['index']['error']]) else instance.version = response['index']['_version'] block.call end end end
Private Instance Methods
add(index, id, *commands, &block)
click to toggle source
# File lib/elastictastic/bulk_persistence_strategy.rb, line 122 def add(index, id, *commands, &block) document_id = [index.name, id] if id && @operations_by_id.key?(document_id) @operations_by_id[document_id].skip = true end @operations << operation = Operation.new(id, commands, block) @operations_by_id[document_id] = operation flush if @auto_flush && @operations.length >= @auto_flush end
bulk_identifier(index, type, id, routing, parent_id, version)
click to toggle source
# File lib/elastictastic/bulk_persistence_strategy.rb, line 113 def bulk_identifier(index, type, id, routing, parent_id, version) identifier = { :_index => index.name, :_type => type } identifier['_id'] = id if id identifier['_version'] = version if version identifier['_routing'] = routing.to_s if routing identifier['parent'] = parent_id if parent_id identifier end
bulk_identifier_for_instance(instance)
click to toggle source
# File lib/elastictastic/bulk_persistence_strategy.rb, line 102 def bulk_identifier_for_instance(instance) bulk_identifier( instance.index, instance.class.type, instance.id, instance.class.route(instance), instance._parent_id, instance.version ) end