class TopologicalInventory::Providers::Common::SaveInventory::Saver

Constants

KAFKA_PAYLOAD_MAX_BYTES_DEFAULT

As defined in: github.com/zendesk/ruby-kafka/blob/02f7e2816e1130c5202764c275e36837f57ca4af/lib/kafka/protocol/message.rb#L11-L17 There is at least 112 bytes that are added as a message header, so we need to keep room for that. Lets make it 512 bytes, just for sure.

KAFKA_RESERVED_HEADER_SIZE

Attributes

client[R]
logger[R]
max_bytes[R]

Public Class Methods

new(client:, logger:, max_bytes: KAFKA_PAYLOAD_MAX_BYTES_DEFAULT) click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 15
def initialize(client:, logger:, max_bytes: KAFKA_PAYLOAD_MAX_BYTES_DEFAULT)
  @client    = client
  @logger    = logger
  @max_bytes = payload_max_size(max_bytes)
end

Public Instance Methods

save(data) click to toggle source

@return [Integer] A total number of parts that the payload was divided into

# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 24
def save(data)
  inventory = data[:inventory].to_hash

  inventory_json = JSON.generate(inventory)
  if inventory_json.size < max_bytes
    save_inventory(inventory_json)
    return 1
  else
    # GC can clean this up
    inventory_json = nil
    return save_payload_in_batches(inventory)
  end
end

Private Instance Methods

build_new_collection(collection) click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 111
def build_new_collection(collection)
  {:name => collection[:name], :data => []}
end
build_new_inventory(inventory) click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 115
def build_new_inventory(inventory)
  new_inventory                           = inventory.clone
  new_inventory[:refresh_state_part_uuid] = SecureRandom.uuid
  new_inventory[:collections]             = []
  new_inventory
end
payload_max_size(max_bytes) click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 122
def payload_max_size(max_bytes)
  if ENV['KAFKA_PAYLOAD_MAX_BYTES']
    max_bytes.clamp(5_000, ENV['KAFKA_PAYLOAD_MAX_BYTES'].to_i) - KAFKA_RESERVED_HEADER_SIZE
  else
    max_bytes - KAFKA_RESERVED_HEADER_SIZE
  end
end
save_inventory(inventory_json) click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 96
def save_inventory(inventory_json)
  client.save_inventory_with_http_info(inventory_json)
end
save_payload_in_batches(inventory) click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 40
def save_payload_in_batches(inventory)
  parts         = 0
  new_inventory = build_new_inventory(inventory)

  inventory[:collections].each do |collection|
    new_collection = build_new_collection(collection)

    data = collection[:data].map { |x| JSON.generate(x) }
    # Lets compute sizes of the each data item, plus 1 byte for comma
    data_sizes = data.map { |x| x.size + 1 }

    # Size of the current inventory and new_collection wrapper, plus 2 bytes for array signs
    wrapper_size = JSON.generate(new_inventory).size + JSON.generate(new_collection).size + 2
    total_size   = wrapper_size
    counter      = 0
    data_sizes.each do |data_size|
      counter    += 1
      total_size += data_size

      if total_size > max_bytes
        # Remove the last data part, that caused going over the max limit
        counter -= 1

        # Add the entities to new collection, so the total size is below max
        if counter > 0
          new_collection[:data] = collection[:data].shift(counter)
          new_inventory[:collections] << new_collection
        end

        # Save the current batch
        serialize_and_save_inventory(new_inventory)
        parts += 1

        # Create new data containers for a new batch
        new_inventory  = build_new_inventory(inventory)
        new_collection = build_new_collection(collection)
        wrapper_size   = JSON.generate(new_inventory).size + JSON.generate(new_collection).size + 2

        # Start with the data part we've removed from the currently saved payload
        total_size = wrapper_size + data_size
        counter    = 1
      end
    end

    # Store the rest of the collection
    new_collection[:data] = collection[:data].shift(counter)
    new_inventory[:collections] << new_collection
  end

  # save the rest
  serialize_and_save_inventory(new_inventory)
  parts += 1

  return parts
end
serialize_and_save_inventory(inventory) click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 100
def serialize_and_save_inventory(inventory)
  payload = JSON.generate(inventory)
  if payload.size > max_bytes
    raise Exception::EntityTooLarge,
          "Entity is bigger than total limit and can't be split: #{payload}"
  end

  # Save the current batch
  save_inventory(payload)
end