module Kafka::Retryable::HandleFailure::InstanceMethods

Public Instance Methods

handle_failure(message) { || ... } click to toggle source

@param message [Hash|String] this should be the exact message the needs to be sent to Kafka. kafka_retryable does not perform any modifications on this message and sends it as it is to Kafka

# File lib/kafka/retryable/handle_failure.rb, line 44
def handle_failure(message)
  yield
rescue => error
  run_failure_hooks(error, message)
  run_after_failure_action(error, message)
end

Private Instance Methods

configuration() click to toggle source

Fetch configuration for the class

# File lib/kafka/retryable/handle_failure.rb, line 72
def configuration
  @configuration ||= failure_configuration[self.class.name]
end
error_in_blacklist?(error, exception_blacklist) click to toggle source

This is false by default for all error classes It means that all error classes will trigger failure handling unless they are specified in the blacklist

# File lib/kafka/retryable/handle_failure.rb, line 84
def error_in_blacklist?(error, exception_blacklist)
  return false if (exception_blacklist || []).empty?
  exception_blacklist.include?(error)
end
error_in_whitelist?(error, exception_whitelist) click to toggle source

This is true by default for all error classes It means that all error classes will trigger failure handling by default If this list exists then only the errors in this list will trigger failure handling

# File lib/kafka/retryable/handle_failure.rb, line 92
def error_in_whitelist?(error, exception_whitelist)
  return true if (exception_whitelist || []).empty?
  !exception_whitelist.include?(error)
end
failure_configuration() click to toggle source

@@failure_configuration is stored in a hash when the class is first loaded

# File lib/kafka/retryable/handle_failure.rb, line 77
def failure_configuration
  self.class.failure_configuration
end
run_after_failure_action(error, message) click to toggle source

Run any post-processing logic here, e.g. notifying Bugsnag @param error [Class] @param message [Hash|String]

# File lib/kafka/retryable/handle_failure.rb, line 67
def run_after_failure_action(error, message)
  configuration[:after_failure].call(error, message)
end
run_failure_hooks(error, message) click to toggle source

Find the failure policy based on the buffer, and execute the logic within the policy @param error [Class] @param message [Hash|String]

# File lib/kafka/retryable/handle_failure.rb, line 56
def run_failure_hooks(error, message)
  return if error_in_blacklist?(error, configuration[:exception_blacklist])
  return unless error_in_whitelist?(error, configuration[:exception_whitelist])

  policy = PolicyFinder.find_by(configuration[:buffer])
  policy.new(configuration[:dead_letter_queue], message).perform_failure_recovery
end