class TopologicalInventory::Providers::Common::SaveInventory::Saver
Constants
- KAFKA_PAYLOAD_MAX_BYTES_DEFAULT
As defined in: github.com/zendesk/ruby-kafka/blob/02f7e2816e1130c5202764c275e36837f57ca4af/lib/kafka/protocol/message.rb#L11-L17 There is at least 112 bytes that are added as a message header, so we need to keep room for that. Lets make it 512 bytes, just for sure.
- KAFKA_RESERVED_HEADER_SIZE
Attributes
client[R]
logger[R]
max_bytes[R]
Public Class Methods
new(client:, logger:, max_bytes: KAFKA_PAYLOAD_MAX_BYTES_DEFAULT)
click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 15 def initialize(client:, logger:, max_bytes: KAFKA_PAYLOAD_MAX_BYTES_DEFAULT) @client = client @logger = logger @max_bytes = payload_max_size(max_bytes) end
Public Instance Methods
save(data)
click to toggle source
@return [Integer] A total number of parts that the payload was divided into
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 24 def save(data) inventory = data[:inventory].to_hash inventory_json = JSON.generate(inventory) if inventory_json.size < max_bytes save_inventory(inventory_json) return 1 else # GC can clean this up inventory_json = nil return save_payload_in_batches(inventory) end end
Private Instance Methods
build_new_collection(collection)
click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 111 def build_new_collection(collection) {:name => collection[:name], :data => []} end
build_new_inventory(inventory)
click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 115 def build_new_inventory(inventory) new_inventory = inventory.clone new_inventory[:refresh_state_part_uuid] = SecureRandom.uuid new_inventory[:collections] = [] new_inventory end
payload_max_size(max_bytes)
click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 122 def payload_max_size(max_bytes) if ENV['KAFKA_PAYLOAD_MAX_BYTES'] max_bytes.clamp(5_000, ENV['KAFKA_PAYLOAD_MAX_BYTES'].to_i) - KAFKA_RESERVED_HEADER_SIZE else max_bytes - KAFKA_RESERVED_HEADER_SIZE end end
save_inventory(inventory_json)
click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 96 def save_inventory(inventory_json) client.save_inventory_with_http_info(inventory_json) end
save_payload_in_batches(inventory)
click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 40 def save_payload_in_batches(inventory) parts = 0 new_inventory = build_new_inventory(inventory) inventory[:collections].each do |collection| new_collection = build_new_collection(collection) data = collection[:data].map { |x| JSON.generate(x) } # Lets compute sizes of the each data item, plus 1 byte for comma data_sizes = data.map { |x| x.size + 1 } # Size of the current inventory and new_collection wrapper, plus 2 bytes for array signs wrapper_size = JSON.generate(new_inventory).size + JSON.generate(new_collection).size + 2 total_size = wrapper_size counter = 0 data_sizes.each do |data_size| counter += 1 total_size += data_size if total_size > max_bytes # Remove the last data part, that caused going over the max limit counter -= 1 # Add the entities to new collection, so the total size is below max if counter > 0 new_collection[:data] = collection[:data].shift(counter) new_inventory[:collections] << new_collection end # Save the current batch serialize_and_save_inventory(new_inventory) parts += 1 # Create new data containers for a new batch new_inventory = build_new_inventory(inventory) new_collection = build_new_collection(collection) wrapper_size = JSON.generate(new_inventory).size + JSON.generate(new_collection).size + 2 # Start with the data part we've removed from the currently saved payload total_size = wrapper_size + data_size counter = 1 end end # Store the rest of the collection new_collection[:data] = collection[:data].shift(counter) new_inventory[:collections] << new_collection end # save the rest serialize_and_save_inventory(new_inventory) parts += 1 return parts end
serialize_and_save_inventory(inventory)
click to toggle source
# File lib/topological_inventory/providers/common/save_inventory/saver.rb, line 100 def serialize_and_save_inventory(inventory) payload = JSON.generate(inventory) if payload.size > max_bytes raise Exception::EntityTooLarge, "Entity is bigger than total limit and can't be split: #{payload}" end # Save the current batch save_inventory(payload) end