class Krakow::Distribution::Default

Default distribution implementation. This uses a round-robin approach for less than ideal states.

Attributes

less_than_ideal_stack[R]
watch_dog[R]

Public Instance Methods

calculate_ready!(connection_identifier) click to toggle source

Update connection ready count @param connection_identifier [String] @return [Integer, nil]

# File lib/krakow/distribution/default.rb, line 99
def calculate_ready!(connection_identifier)
  begin
    registry_info = registry_lookup(connection_identifier)
    unless(less_than_ideal?)
      registry_info[:ready] = ideal - registry_info[:in_flight]
      if(registry_info[:ready] < 0 || registry_info[:backoff_until] > Time.now.to_i)
        registry_info[:ready] = 0
        registry_info[:backoff_timer].cancel if registry[:backoff_timer]
        registry_info[:backoff_timer] = after(registry_info[:backoff_until] - Time.now.to_i) do
          calculate_ready!(connection_identifier)
          set_ready_for(connection_lookup(connection_identifier)) unless less_than_ideal?
        end
      end
      registry_info[:ready]
    else
      registry_info[:ready] = 0
    end
  rescue Error::ConnectionFailure
    warn 'Failed connection encountered!'
  rescue Error::ConnectionUnavailable
    warn 'Unavailable connection encountered!'
  end
end
force_unready() click to toggle source

Force a connection to give up RDY state so next in stack can receive

@return [nil]

# File lib/krakow/distribution/default.rb, line 144
def force_unready
  debug 'Forcing a connection into an unready state due to less than ideal state'
  connection = rdy_connections.shuffle.first
  if(connection)
    debug "Stripping RDY state from connection: #{connection}"
    calculate_ready!(connection.identifier)
    set_ready_for(connection)
  else
    warn "Failed to locate available connection for RDY aquisition!"
  end
  nil
end
less_than_ideal?() click to toggle source

Is ideal less than 1

@return [TrueClass, FalseClass]

# File lib/krakow/distribution/default.rb, line 48
def less_than_ideal?
  ideal < 1
end
less_than_ideal_ready!() click to toggle source

Find next connection to receive RDY count

@return [Krakow::Connection, nil]

# File lib/krakow/distribution/default.rb, line 55
def less_than_ideal_ready!
  admit_defeat = false
  connection = nil
  until(connection || (admit_defeat && less_than_ideal_stack.empty?))
    if(less_than_ideal_stack.nil? || less_than_ideal_stack.empty?)
      @less_than_ideal_stack = waiting_connections
      admit_defeat = true
    end
    con = less_than_ideal_stack.pop
    if(con)
      unless(registry_lookup(con.identifier)[:backoff_until] > Time.now.to_i)
        connection = con
      end
    end
  end
  if(connection)
    registry_lookup(connection.identifier)[:ready] = 1
    connection
  end
end
rdy_connections() click to toggle source

All connections with RDY state

@return [Array<Krakow::Connection>]

# File lib/krakow/distribution/default.rb, line 135
def rdy_connections
  registry.find_all do |conn_id, info|
    info[:ready] > 0
  end.map{|conn_id, info| connection_lookup(conn_id) }.compact
end
redistribute!() click to toggle source

recalculate `ideal` and update RDY on connections

# File lib/krakow/distribution/default.rb, line 12
def redistribute!
  @ideal = registry.size < 1 ? 0 : max_in_flight / registry.size
  debug "Distribution calculated ideal: #{ideal}"
  if(less_than_ideal?)
    registry.each do |connection_id, reg_info|
      reg_info[:ready] = 0
    end
    max_in_flight.times do
      less_than_ideal_ready!
    end
    connections.each do |connection|
      set_ready_for(connection, :force)
    end
    watch_dog.cancel if watch_dog
    @watch_dog = every(watch_dog_interval) do
      force_unready
    end
  else
    if(watch_dog)
      watch_dog.cancel
      @watch_dog = nil
    end
    connections.each do |connection|
      current_ready = ready_for(connection.identifier)
      calculate_ready!(connection.identifier)
      unless(current_ready == ready_for(connection.identifier))
        debug "Redistribution ready setting update for connection #{connection}"
        set_ready_for(connection)
      end
    end
  end
end
set_ready_for(connection, *args) click to toggle source

Adds extra functionality to provide round robin RDY setting when in less than ideal state

@param connection [Krakow::Connection] @param args [Symbol] @return [Krakow::FrameType::Error, nil]

Calls superclass method Krakow::Distribution#set_ready_for
# File lib/krakow/distribution/default.rb, line 82
def set_ready_for(connection, *args)
  super connection
  if(less_than_ideal? && !args.include?(:force))
    debug "RDY set ignored due to less than ideal state (con: #{connection})"
    con = less_than_ideal_ready!
    if(con)
      watch_dog.reset if watch_dog
      super con
    else
      warn 'Failed to set RDY state while less than ideal. Connection stack is empty!'
    end
  end
end
waiting_connections() click to toggle source

All connections without RDY state

@return [Array<Krakow::Connection>]

# File lib/krakow/distribution/default.rb, line 126
def waiting_connections
  registry.find_all do |conn_id, info|
    info[:ready] < 1 && info[:in_flight] < 1 && info[:backoff_until] < Time.now.to_i
  end.map{|conn_id, info| connection_lookup(conn_id) }.compact
end