class Deimos::ActiveRecordConsumer

To configure batch vs. message mode, change the delivery mode of your Phobos listener. Message-by-message -> use `delivery: message` or `delivery: batch` Batch -> use `delivery: inline_batch`

Public Class Methods

compacted(val) click to toggle source

param val [Boolean] Turn pre-compaction of the batch on or off. If true, only the last message for each unique key in a batch is processed.

# File lib/deimos/active_record_consumer.rb, line 34
def compacted(val)
  config[:compacted] = val
end
new() click to toggle source

Setup

# File lib/deimos/active_record_consumer.rb, line 40
def initialize
  @klass = self.class.config[:record_class]
  @converter = ActiveRecordConsume::SchemaModelConverter.new(self.class.decoder, @klass)

  if self.class.config[:key_schema]
    @key_converter = ActiveRecordConsume::SchemaModelConverter.new(self.class.key_decoder, @klass)
  end

  @compacted = self.class.config[:compacted] != false
end
record_class(klass) click to toggle source

param klass [Class < ActiveRecord::Base] the class used to save to the database.

# File lib/deimos/active_record_consumer.rb, line 28
def record_class(klass)
  config[:record_class] = klass
end

Public Instance Methods

process_message?(_payload) click to toggle source

Override this message to conditionally save records @param payload [Hash] The kafka message as a hash @return [Boolean] if true, record is created/update.

If false, record processing is skipped but message offset is still committed.
# File lib/deimos/active_record_consumer.rb, line 63
def process_message?(_payload)
  true
end
record_attributes(payload, _key=nil) click to toggle source

Override this method (with `super`) if you want to add/change the default attributes set to the new/existing record. @param payload [Hash] @param _key [String]

# File lib/deimos/active_record_consumer.rb, line 55
def record_attributes(payload, _key=nil)
  @converter.convert(payload)
end