class Krakow::Distribution
Message distribution @abstract
Attributes
@!parse include Krakow::Utils::Lazy::InstanceMethods
@!parse extend Krakow::Utils::Lazy::ClassMethods
@!parse include Krakow::Utils::Lazy::InstanceMethods
@!parse extend Krakow::Utils::Lazy::ClassMethods
@!parse include Krakow::Utils::Lazy::InstanceMethods
@!parse extend Krakow::Utils::Lazy::ClassMethods
Public Class Methods
@!endgroup
# File lib/krakow/distribution.rb, line 33 def initialize(args={}) super @ideal = 0 @flight_record = {} @registry = {} end
Public Instance Methods
Add connection to make available for RDY distribution
@param connection [Krakow::Connection] @return [TrueClass]
# File lib/krakow/distribution.rb, line 123 def add_connection(connection) unless(registry[connection.identifier]) registry[connection.identifier] = { :ready => initial_ready, :in_flight => 0, :failures => 0, :backoff_until => 0 } end true end
- Abstract
-
Determine RDY value for given connection
@param connection_identifier [String] @return [Integer]
# File lib/krakow/distribution.rb, line 48 def calculate_ready!(connection_identifier) raise NotImplementedError.new 'Custom `#calculate_ready!` method must be provided!' end
Return connection associated with given registry key
@param identifier [String] connection identifier @return [Krakow::Connection, nil]
# File lib/krakow/distribution.rb, line 156 def connection_lookup(identifier) consumer.connection(identifier) end
@return [Array<Krakow::Connection>] connections in registry
# File lib/krakow/distribution.rb, line 192 def connections registry.keys.map do |identifier| connection_lookup(identifier) end.compact end
Log failure of processed message
@param connection_identifier [String] @return [TrueClass]
# File lib/krakow/distribution.rb, line 202 def failure(connection_identifier) if(backoff_interval) registry_info = registry_lookup(connection_identifier) registry_info[:failures] += 1 registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval) end true end
Return source connection for given message ID
@param msg_id [String] @yield execute with connection @yieldparam connection [Krakow::Connection] @return [Krakow::Connection, Object]
# File lib/krakow/distribution.rb, line 166 def in_flight_lookup(msg_id) connection = connection_lookup(flight_record[msg_id]) unless(connection) abort Krakow::Error::LookupFailed.new("Failed to locate in flight message (ID: #{msg_id})") end if(block_given?) begin yield connection rescue => e abort e end else connection end end
Initial ready value used for new connections
@return [Integer]
# File lib/krakow/distribution.rb, line 99 def initial_ready ideal > 0 ? 1 : 0 end
Return the currently configured RDY value for given connnection
@param connection_identifier [String] @return [Integer]
# File lib/krakow/distribution.rb, line 79 def ready_for(connection_identifier) registry_lookup(connection_identifier)[:ready] end
- Abstract
-
Reset flight distributions
# File lib/krakow/distribution.rb, line 41 def redistribute! raise NotImplementedError.new 'Custom `#redistrubute!` method must be provided!' end
Registers message into registry and configures for distribution
@param message [FrameType::Message] @param connection_identifier [String] @return [Integer]
# File lib/krakow/distribution.rb, line 108 def register_message(message, connection_identifier) if(flight_record[message.message_id]) abort KeyError.new "Message is already registered in flight record! (#{message.message_id})" else registry_info = registry_lookup(connection_identifier) registry_info[:in_flight] += 1 flight_record[message.message_id] = connection_identifier calculate_ready!(connection_identifier) end end
Return registry information for given connection @param connection_identifier [String] @return [Hash] registry information @raise [Krakow::Error::LookupFailed]
# File lib/krakow/distribution.rb, line 186 def registry_lookup(connection_identifier) registry[connection_identifier] || abort(Krakow::Error::LookupFailed.new("Failed to locate connection information in registry (#{connection_identifier})")) end
Remove connection from RDY distribution
@param connection_identifier [String] @return [TrueClass]
# File lib/krakow/distribution.rb, line 139 def remove_connection(connection_identifier, *args) # remove connection from registry registry.delete(connection_identifier) # remove any in flight messages flight_record.delete_if do |k,v| if(v == connection_identifier) warn "Removing in flight reference due to failed connection: #{v}" true end end true end
Send RDY for given connection
@param connection [Krakow::Connection] @return [Krakow::FrameType::Error,nil]
# File lib/krakow/distribution.rb, line 88 def set_ready_for(connection, *_) connection.transmit( Command::Rdy.new( :count => ready_for(connection.identifier) ) ) end
Log success of processed message
@param connection_identifier [String] @return [TrueClass]
# File lib/krakow/distribution.rb, line 215 def success(connection_identifier) if(backoff_interval) registry_info = registry_lookup(connection_identifier) if(registry_info[:failures] > 1) registry_info[:failures] -= 1 registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval) else registry_info[:failures] = 0 end end true end
Remove message metadata from registry
@param message [Krakow::FrameType::Message, String] message or ID @return [Krakow::Connection, NilClass]
# File lib/krakow/distribution.rb, line 56 def unregister_message(message) msg_id = message.respond_to?(:message_id) ? message.message_id : message.to_s connection = connection_lookup(flight_record[msg_id]) flight_record.delete(msg_id) if(connection) begin ident = connection.identifier registry_info = registry_lookup(ident) registry_info[:in_flight] -= 1 calculate_ready!(ident) connection rescue Celluloid::DeadActorError warn 'Connection is dead. No recalculation applied on ready.' end else warn 'No connection associated to message via lookup. No recalculation applied on ready.' end end