class Sidekiq::Superworker::DSLHash

Attributes

record_id[RW]

Public Class Methods

new(hash, args={}) click to toggle source
# File lib/sidekiq/superworker/dsl_hash.rb, line 6
def initialize(hash, args={})
  @hash = hash
  @args = args
end

Public Instance Methods

rewrite_record_ids(first_record_id) click to toggle source
# File lib/sidekiq/superworker/dsl_hash.rb, line 11
def rewrite_record_ids(first_record_id)
  @record_id = first_record_id - 1
  rewrite_ids_of_nested_hash(@hash)
end
to_records() click to toggle source
# File lib/sidekiq/superworker/dsl_hash.rb, line 16
def to_records
  @record_id = 1
  @records = {}
  nested_hash_to_records(@hash)
end

Private Instance Methods

batch_values_to_batch_arrays(values) click to toggle source
# File lib/sidekiq/superworker/dsl_hash.rb, line 144
def batch_values_to_batch_arrays(values)
  arrays = values.map { |value| value.is_a?(Array) ? value : [value] }
  max_length = arrays.map(&:length).max
  arrays.map! do |array|
    if array.length < max_length
      repetition_count = (max_length / array.length).ceil
      array = (array * repetition_count).take(max_length)
    end
    array
  end
  first_array = arrays.shift
  first_array.zip(*arrays)
end
children_ids_for_batch(subjobs, batch_keys_to_iteration_keys) click to toggle source
# File lib/sidekiq/superworker/dsl_hash.rb, line 74
def children_ids_for_batch(subjobs, batch_keys_to_iteration_keys)
  iteration_keys = batch_keys_to_iteration_keys.values
  batch_iteration_arg_value_arrays = get_batch_iteration_arg_value_arrays(batch_keys_to_iteration_keys)

  batch_id = @record_id - 1

  children_ids = []
  batch_iteration_arg_value_arrays.each do |batch_iteration_arg_value_array|
    iteration_args = {}
    batch_iteration_arg_value_array.each_with_index do |arg_value, arg_index|
      arg_key = iteration_keys[arg_index]
      iteration_args[arg_key] = arg_value
    end

    batch_child_id = @record_id
    batch_child = {
      subjob_id: batch_child_id,
      subworker_class: 'batch_child',
      arg_keys: iteration_keys,
      arg_values: iteration_args.values,
      parent_id: batch_id,
      children_ids: []
    }
    @records[batch_child_id] = batch_child

    @record_id += 1
    last_subjob_id = nil
    subjobs.values.each_with_index do |subjob, index|
      subjob_id = @record_id
      @record_id += 1
      subjob = subjob.dup
      children = subjob.delete(:children)
      subjob[:subjob_id] = subjob_id
      subjob[:parent_id] = batch_child_id
      subjob[:arg_values] = iteration_args.values
      @records[subjob_id] = subjob
      @records[last_subjob_id][:next_id] = subjob_id if last_subjob_id
      last_subjob_id = subjob_id
      nested_hash_to_records(children, parent_id: subjob_id, scoped_args: iteration_args)
      batch_child[:children_ids] << subjob_id
    end

    children_ids << batch_child_id
  end
  
  children_ids
end
get_batch_iteration_arg_value_arrays(batch_keys_to_iteration_keys) click to toggle source

Returns an array of argument value arrays, each of which should be passed to each of the batch iterations

# File lib/sidekiq/superworker/dsl_hash.rb, line 124
def get_batch_iteration_arg_value_arrays(batch_keys_to_iteration_keys)
  batch_keys = batch_keys_to_iteration_keys.keys
  batch_keys_to_batch_values = @args.slice(*(batch_keys))
  batch_values = batch_keys_to_batch_values.values
  batch_values_to_batch_arrays(batch_values)
end
nested_hash_to_records(nested_hash, options={}) click to toggle source
# File lib/sidekiq/superworker/dsl_hash.rb, line 24
def nested_hash_to_records(nested_hash, options={})
  return @records if nested_hash.blank?

  defaults = {
    parent_id: nil,
    scoped_args: nil # Args that are scoped to this subset of the nested hash (necessary for batch hashes)
  }
  options.reverse_merge!(defaults)
  parent_id = options[:parent_id]
  last_id = nil

  nested_hash.values.each do |value|
    id = @record_id
    @record_id += 1
    arg_values = value[:arg_keys].collect do |arg_key|
      # Allow for subjob arg_values to be set within the superworker definition; if a symbol is
      # used in the DSL, use @args[arg_key], and otherwise use arg_key as the value
      if arg_key.is_a?(Symbol)
        options[:scoped_args] ? options[:scoped_args][arg_key] : @args[arg_key]
      else
        arg_key
      end
    end
    
    @records[id] = {
      subjob_id: id,
      subworker_class: value[:subworker_class].to_s,
      arg_keys: value[:arg_keys],
      arg_values: arg_values,
      parent_id: parent_id
    }
    if value[:subworker_class] == :batch
      @arg_keys_to_arg_keys ||= Hash[@args.keys.map { |key| [key, key] }]
      batch_keys_to_iteration_keys = @arg_keys_to_arg_keys.merge(arg_values[0])
      @records[id][:children_ids] = children_ids_for_batch(value[:children], batch_keys_to_iteration_keys)
    end

    @records[last_id][:next_id] = id if @records[last_id]
    last_id = id

    if parent_id && @records[parent_id]
      @records[parent_id][:children_ids] ||= []
      @records[parent_id][:children_ids] << id
    end

    nested_hash_to_records(value[:children], parent_id: id, scoped_args: options[:scoped_args]) if value[:children] && value[:subworker_class] != :batch
  end
  @records
end
rewrite_ids_of_nested_hash(nested_hash) click to toggle source
# File lib/sidekiq/superworker/dsl_hash.rb, line 131
def rewrite_ids_of_nested_hash(nested_hash)
  new_hash = {}
  nested_hash.each do |old_record_id, record|
    @record_id += 1
    parent_record_id = @record_id
    new_hash[parent_record_id] = record
    if record[:children]
      new_hash[parent_record_id][:children] = rewrite_ids_of_nested_hash(record[:children])
    end
  end
  new_hash
end