module Flex::LiveReindex
private module
Attributes
each_change[R]
Public Instance Methods
on_each_change(&block)
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 54 def on_each_change(&block) @each_change = block end
on_reindex(&block)
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 50 def on_reindex(&block) @reindex = block end
on_stop_indexing(&block)
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 59 def on_stop_indexing(&block) @stop_indexing = block end
prefix_index(index)
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 102 def prefix_index(index) base = unprefix_index(index) # raise if base is not included in @ensure_indices raise ExtraIndexError, "The index #{base} is missing from the :ensure_indices option. Reindexing aborted." \ if @ensure_indices && !@ensure_indices.include?(base) prefixed = @timestamp + base unless @indices.include?(base) unless Flex.exist?(:index => prefixed) config_hash[base] = {} unless config_hash.has_key?(base) Flex.POST "/#{prefixed}", config_hash[base] end @indices |= [base] end prefixed end
reindex(opts={}) { |self| ... }
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 63 def reindex(opts={}) yield self perform(opts) end
reindex_indices(opts={}) { |self| ... }
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 68 def reindex_indices(opts={}) yield self if block_given? opts[:verbose] = true unless opts.has_key?(:verbose) opts[:index] ||= opts.delete(:indices) || config_hash.keys # we override the on_reindex eventually set on_reindex do migrate_indices(opts) end perform(opts) end
should_prefix_index?()
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 82 def should_prefix_index? Redis.get(:pid) == $$.to_s end
should_track_change?()
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 86 def should_track_change? pid = Redis.get(:pid) !!pid && !(pid == $$.to_s) end
track_change(action, document)
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 91 def track_change(action, document) Redis.rpush(:changes, MultiJson.encode([action, document])) end
track_external_change(app_id, action, document)
click to toggle source
use this method when you are tracking the change of another app you must pass the app_id of the app being affected by the change
# File lib/flex/live_reindex_admin.rb, line 97 def track_external_change(app_id, action, document) return unless Conf.redis Conf.redis.rpush("#{KEYS[:changes]}-#{app_id}", MultiJson.encode([action, document])) end
unprefix_index(index)
click to toggle source
remove the (eventual) prefix
# File lib/flex/live_reindex_admin.rb, line 119 def unprefix_index(index) index.sub(/^\d{14}_/, '') end
Private Instance Methods
build_bulk_string(action, document)
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 231 def build_bulk_string(action, document) result = if @each_change document.extend(Result::Document) document.extend(Result::DocumentLoader) @each_change.call(action, document) else [{ action => document }] end result = [result] unless result.is_a?(Array) bulk_string = '' result.compact.each do |hash| act, doc = hash.to_a.flatten bulk_string << Flex.build_bulk_string(doc, :action => act) end bulk_string end
build_bulk_string_from_change(change)
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 225 def build_bulk_string_from_change(change) action, document = MultiJson.decode(change) return '' unless @indices.include?(unprefix_index(document['_index'])) build_bulk_string(action, document) end
config_hash()
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 125 def config_hash @config_hash ||= ModelTasks.new.config_hash end
migrate_indices(opts)
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 205 def migrate_indices(opts) opts[:verbose] = true unless opts.has_key?(:verbose) pbar = ProgBar.new(Flex.count(opts)['count'], nil, "index #{opts[:index].inspect}: ") if opts[:verbose] Flex.dump_all(opts) do |batch| result = process_and_post_batch(batch) pbar.process_result(result, batch.size) if opts[:verbose] end pbar.finish if opts[:verbose] end
perform(opts={})
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 129 def perform(opts={}) Conf.logger.warn 'Safe reindex is disabled!' if opts[:safe_reindex] == false Redis.init @indices = [] @timestamp = Time.now.strftime('%Y%m%d%H%M%S_') @ensure_indices = nil unless opts[:on_stop_indexing] == false || Conf.on_stop_indexing == false @stop_indexing ||= Conf.on_stop_indexing || raise(MissingStopIndexingProcError, 'The on_stop_indexing block is not set.') end raise MissingOnReindexBlockError, 'You must configure an on_reindex block.' \ unless @reindex raise MissingEnsureIndicesError, 'You must pass the :ensure_indices option when you pass the :models option.' \ if opts.has_key?(:models) && !opts.has_key?(:ensure_indices) if opts[:ensure_indices] @ensure_indices = opts.delete(:ensure_indices) @ensure_indices = @ensure_indices.split(',') unless @ensure_indices.is_a?(Array) each_change = @each_change @each_change = nil migrate_indices(:index => @ensure_indices) @each_change = each_change end @reindex.call # when the reindexing is finished we try to empty the changes list a few times tries = 0 bulk_string = '' until (count = Redis.llen(:changes)) == 0 || tries > 9 count.times { bulk_string << build_bulk_string_from_change(Redis.lpop(:changes))} Flex.post_bulk_string(:bulk_string => bulk_string) bulk_string = '' tries += 1 end # at this point the changes list should be empty or contain the minimum number of changes we could achieve live # the @stop_indexing should ensure to stop/suspend all the actions that would produce changes in the indices being reindexed @stop_indexing.call if @stop_indexing # if we have still changes, we can index them (until the list will be empty) bulk_string = '' while (change = Redis.lpop(:changes)) bulk_string << build_bulk_string_from_change(change) end Flex.post_bulk_string(:bulk_string => bulk_string) # deletes the old indices and create the aliases to the new @indices.each do |index| Flex.delete_index :index => index Flex.put_index_alias :alias => index, :index => @timestamp + index end # after the execution of this method the user should deploy the new code and then resume the regular app processing # we redefine this method so it will raise an error if any new live-reindex is attempted during this session. unless opts[:safe_reindex] == false class_eval <<-ruby, __FILE__, __LINE__ def perform(*) raise MultipleReindexError, "Multiple live-reindex attempted! You cannot use any reindexing method multiple times in the same session or you may corrupt your index/indices! The previous reindexing in this session successfully reindexed and swapped the new index/indices: #{@indices.map{|i| @timestamp + i}.join(', ')}. You must deploy now, and run the other reindexing in single successive deploys ASAP. Notice that if the code-changes that you are about to deploy rely on the successive reindexings that have been aborted, your app may fail. If you are working in development mode you must restart the session now. The next time you can silence this error by passing :safe_reindex => false" end ruby end rescue Exception # delete all the created indices @indices ||=[] @indices.each do |index| Flex.delete_index :index => @timestamp + index end raise ensure Redis.reset_keys end
process_and_post_batch(batch)
click to toggle source
# File lib/flex/live_reindex_admin.rb, line 217 def process_and_post_batch(batch) bulk_string = '' batch.each do |document| bulk_string << build_bulk_string('index', document) end Flex.post_bulk_string(:bulk_string => bulk_string) end