class Deimos::Consumer
Basic consumer class. Inherit from this class and override either consume or consume_batch, depending on the delivery mode of your listener. `consume` -> use `delivery :message` or `delivery :batch` `consume_batch` -> use `delivery :inline_batch`
Public Class Methods
decoder()
click to toggle source
@return [Deimos::SchemaBackends::Base]
# File lib/deimos/consumer.rb, line 21 def decoder @decoder ||= Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace]) end
key_decoder()
click to toggle source
@return [Deimos::SchemaBackends::Base]
# File lib/deimos/consumer.rb, line 27 def key_decoder @key_decoder ||= Deimos.schema_backend(schema: config[:key_schema], namespace: config[:namespace]) end
Public Instance Methods
decode_key(key)
click to toggle source
Helper method to decode an encoded key. @param key [String] @return [Object] the decoded key.
# File lib/deimos/consumer.rb, line 36 def decode_key(key) return nil if key.nil? config = self.class.config unless config[:key_configured] raise 'No key config given - if you are not decoding keys, please use '\ '`key_config plain: true`' end if config[:key_field] self.class.decoder.decode_key(key, config[:key_field]) elsif config[:key_schema] self.class.key_decoder.decode(key, schema: config[:key_schema]) else # no encoding key end end
Private Instance Methods
_error(exception, payload, metadata)
click to toggle source
@param exception [Exception] @param payload [Hash] @param metadata [Hash]
# File lib/deimos/consumer.rb, line 96 def _error(exception, payload, metadata) Deimos.config.tracer&.set_error(@span, exception) raise if Deimos.config.consumers.reraise_errors || Deimos.config.consumers.fatal_error&.call(exception, payload, metadata) || fatal_error?(exception, payload, metadata) end
_report_time_delayed(payload, metadata)
click to toggle source
# File lib/deimos/consumer.rb, line 66 def _report_time_delayed(payload, metadata) return if payload.nil? || payload['timestamp'].blank? begin time_delayed = Time.now.in_time_zone - payload['timestamp'].to_datetime rescue ArgumentError Deimos.config.logger.info( message: "Error parsing timestamp! #{payload['timestamp']}" ) return end Deimos.config.metrics&.histogram('handler', time_delayed, tags: %W( time:time_delayed topic:#{metadata[:topic]} )) end
_with_span() { || ... }
click to toggle source
# File lib/deimos/consumer.rb, line 56 def _with_span @span = Deimos.config.tracer&.start( 'deimos-consumer', resource: self.class.name.gsub('::', '-') ) yield ensure Deimos.config.tracer&.finish(@span) end
fatal_error?(_error, _payload, _metadata)
click to toggle source
Overrideable method to determine if a given error should be considered “fatal” and always be reraised. @param _error [Exception] @param _payload [Hash] @param _metadata [Hash] @return [Boolean]
# File lib/deimos/consumer.rb, line 89 def fatal_error?(_error, _payload, _metadata) false end