class Krakow::Distribution

Message distribution @abstract

Attributes

flight_record[RW]
registry[RW]

Public Class Methods

new(args={}) click to toggle source

@!endgroup

Calls superclass method
# File lib/krakow/distribution.rb, line 33
def initialize(args={})
  super
  @ideal = 0
  @flight_record = {}
  @registry = {}
end

Public Instance Methods

add_connection(connection) click to toggle source

Add connection to make available for RDY distribution

@param connection [Krakow::Connection] @return [TrueClass]

# File lib/krakow/distribution.rb, line 123
def add_connection(connection)
  unless(registry[connection.identifier])
    registry[connection.identifier] = {
      :ready => initial_ready,
      :in_flight => 0,
      :failures => 0,
      :backoff_until => 0
    }
  end
  true
end
calculate_ready!(connection_identifier) click to toggle source
Abstract

Determine RDY value for given connection

@param connection_identifier [String] @return [Integer]

# File lib/krakow/distribution.rb, line 48
def calculate_ready!(connection_identifier)
  raise NotImplementedError.new 'Custom `#calculate_ready!` method must be provided!'
end
connection_lookup(identifier) click to toggle source

Return connection associated with given registry key

@param identifier [String] connection identifier @return [Krakow::Connection, nil]

# File lib/krakow/distribution.rb, line 156
def connection_lookup(identifier)
  consumer.connection(identifier)
end
connections() click to toggle source

@return [Array<Krakow::Connection>] connections in registry

# File lib/krakow/distribution.rb, line 192
def connections
  registry.keys.map do |identifier|
    connection_lookup(identifier)
  end.compact
end
failure(connection_identifier) click to toggle source

Log failure of processed message

@param connection_identifier [String] @return [TrueClass]

# File lib/krakow/distribution.rb, line 202
def failure(connection_identifier)
  if(backoff_interval)
    registry_info = registry_lookup(connection_identifier)
    registry_info[:failures] += 1
    registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval)
  end
  true
end
in_flight_lookup(msg_id) { |connection| ... } click to toggle source

Return source connection for given message ID

@param msg_id [String] @yield execute with connection @yieldparam connection [Krakow::Connection] @return [Krakow::Connection, Object]

# File lib/krakow/distribution.rb, line 166
def in_flight_lookup(msg_id)
  connection = connection_lookup(flight_record[msg_id])
  unless(connection)
    abort Krakow::Error::LookupFailed.new("Failed to locate in flight message (ID: #{msg_id})")
  end
  if(block_given?)
    begin
      yield connection
    rescue => e
      abort e
    end
  else
    connection
  end
end
initial_ready() click to toggle source

Initial ready value used for new connections

@return [Integer]

# File lib/krakow/distribution.rb, line 99
def initial_ready
  ideal > 0 ? 1 : 0
end
ready_for(connection_identifier) click to toggle source

Return the currently configured RDY value for given connnection

@param connection_identifier [String] @return [Integer]

# File lib/krakow/distribution.rb, line 79
def ready_for(connection_identifier)
  registry_lookup(connection_identifier)[:ready]
end
redistribute!() click to toggle source
Abstract

Reset flight distributions

# File lib/krakow/distribution.rb, line 41
def redistribute!
  raise NotImplementedError.new 'Custom `#redistrubute!` method must be provided!'
end
register_message(message, connection_identifier) click to toggle source

Registers message into registry and configures for distribution

@param message [FrameType::Message] @param connection_identifier [String] @return [Integer]

# File lib/krakow/distribution.rb, line 108
def register_message(message, connection_identifier)
  if(flight_record[message.message_id])
    abort KeyError.new "Message is already registered in flight record! (#{message.message_id})"
  else
    registry_info = registry_lookup(connection_identifier)
    registry_info[:in_flight] += 1
    flight_record[message.message_id] = connection_identifier
    calculate_ready!(connection_identifier)
  end
end
registry_lookup(connection_identifier) click to toggle source

Return registry information for given connection @param connection_identifier [String] @return [Hash] registry information @raise [Krakow::Error::LookupFailed]

# File lib/krakow/distribution.rb, line 186
def registry_lookup(connection_identifier)
  registry[connection_identifier] ||
    abort(Krakow::Error::LookupFailed.new("Failed to locate connection information in registry (#{connection_identifier})"))
end
remove_connection(connection_identifier, *args) click to toggle source

Remove connection from RDY distribution

@param connection_identifier [String] @return [TrueClass]

# File lib/krakow/distribution.rb, line 139
def remove_connection(connection_identifier, *args)
  # remove connection from registry
  registry.delete(connection_identifier)
  # remove any in flight messages
  flight_record.delete_if do |k,v|
    if(v == connection_identifier)
      warn "Removing in flight reference due to failed connection: #{v}"
      true
    end
  end
  true
end
set_ready_for(connection, *_) click to toggle source

Send RDY for given connection

@param connection [Krakow::Connection] @return [Krakow::FrameType::Error,nil]

# File lib/krakow/distribution.rb, line 88
def set_ready_for(connection, *_)
  connection.transmit(
    Command::Rdy.new(
      :count => ready_for(connection.identifier)
    )
  )
end
success(connection_identifier) click to toggle source

Log success of processed message

@param connection_identifier [String] @return [TrueClass]

# File lib/krakow/distribution.rb, line 215
def success(connection_identifier)
  if(backoff_interval)
    registry_info = registry_lookup(connection_identifier)
    if(registry_info[:failures] > 1)
      registry_info[:failures] -= 1
      registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval)
    else
      registry_info[:failures] = 0
    end
  end
  true
end
unregister_message(message) click to toggle source

Remove message metadata from registry

@param message [Krakow::FrameType::Message, String] message or ID @return [Krakow::Connection, NilClass]

# File lib/krakow/distribution.rb, line 56
def unregister_message(message)
  msg_id = message.respond_to?(:message_id) ? message.message_id : message.to_s
  connection = connection_lookup(flight_record[msg_id])
  flight_record.delete(msg_id)
  if(connection)
    begin
      ident = connection.identifier
      registry_info = registry_lookup(ident)
      registry_info[:in_flight] -= 1
      calculate_ready!(ident)
      connection
    rescue Celluloid::DeadActorError
      warn 'Connection is dead. No recalculation applied on ready.'
    end
  else
    warn 'No connection associated to message via lookup. No recalculation applied on ready.'
  end
end