module Kafka::Retryable::HandleFailure::InstanceMethods
Public Instance Methods
@param message [Hash|String] this should be the exact message the needs to be sent to Kafka
. kafka_retryable does not perform any modifications on this message and sends it as it is to Kafka
# File lib/kafka/retryable/handle_failure.rb, line 44 def handle_failure(message) yield rescue => error run_failure_hooks(error, message) run_after_failure_action(error, message) end
Private Instance Methods
Fetch configuration for the class
# File lib/kafka/retryable/handle_failure.rb, line 72 def configuration @configuration ||= failure_configuration[self.class.name] end
This is false by default for all error classes It means that all error classes will trigger failure handling unless they are specified in the blacklist
# File lib/kafka/retryable/handle_failure.rb, line 84 def error_in_blacklist?(error, exception_blacklist) return false if (exception_blacklist || []).empty? exception_blacklist.include?(error) end
This is true by default for all error classes It means that all error classes will trigger failure handling by default If this list exists then only the errors in this list will trigger failure handling
# File lib/kafka/retryable/handle_failure.rb, line 92 def error_in_whitelist?(error, exception_whitelist) return true if (exception_whitelist || []).empty? !exception_whitelist.include?(error) end
@@failure_configuration is stored in a hash when the class is first loaded
# File lib/kafka/retryable/handle_failure.rb, line 77 def failure_configuration self.class.failure_configuration end
Run any post-processing logic here, e.g. notifying Bugsnag @param error [Class] @param message [Hash|String]
# File lib/kafka/retryable/handle_failure.rb, line 67 def run_after_failure_action(error, message) configuration[:after_failure].call(error, message) end
Find the failure policy based on the buffer, and execute the logic within the policy @param error [Class] @param message [Hash|String]
# File lib/kafka/retryable/handle_failure.rb, line 56 def run_failure_hooks(error, message) return if error_in_blacklist?(error, configuration[:exception_blacklist]) return unless error_in_whitelist?(error, configuration[:exception_whitelist]) policy = PolicyFinder.find_by(configuration[:buffer]) policy.new(configuration[:dead_letter_queue], message).perform_failure_recovery end