class Sqrewdriver::Client::SendingBuffer
Attributes
chunks[R]
Public Class Methods
new(client:, queue_url:, serializer:, thread_pool:)
click to toggle source
Calls superclass method
# File lib/sqrewdriver/client.rb, line 68 def initialize(client:, queue_url:, serializer:, thread_pool:) super() @client = client @queue_url = queue_url @chunks = Concurrent::Array.new @serializer = serializer @thread_pool = thread_pool end
Public Instance Methods
add_aggregated_messages(messages)
click to toggle source
# File lib/sqrewdriver/client.rb, line 94 def add_aggregated_messages(messages) base_message = messages[0] message_bodies = messages.map { |m| m[:message_body] } serialized = @serializer.dump(message_bodies) base_message[:message_body] = serialized add_size = calculate_message_size(serialized, base_message[:message_attributes]) synchronize do @chunks << Chunk.new if @chunks.empty? if @chunks.last.size == MAX_BATCH_SIZE || @chunks.last.bytesize + add_size > MAX_PAYLOAD_SIZE new_chunk = Chunk.new new_chunk.add(base_message, add_size) @chunks << new_chunk else @chunks.last.add(base_message, add_size) end end end
add_message(message)
click to toggle source
# File lib/sqrewdriver/client.rb, line 77 def add_message(message) serialized = @serializer.dump(message[:message_body]) message[:message_body] = serialized add_size = calculate_message_size(serialized, message[:attributes]) synchronize do @chunks << Chunk.new if @chunks.empty? if @chunks.last.size == MAX_BATCH_SIZE || @chunks.last.bytesize + add_size > MAX_PAYLOAD_SIZE new_chunk = Chunk.new new_chunk.add(message, add_size) @chunks << new_chunk else @chunks.last.add(message, add_size) end end end
has_full_chunk?()
click to toggle source
# File lib/sqrewdriver/client.rb, line 114 def has_full_chunk? @chunks.size > 1 end
send_first_chunk_async()
click to toggle source
# File lib/sqrewdriver/client.rb, line 118 def send_first_chunk_async Concurrent::Promises.future_on(@thread_pool, @chunks) do |chunks| sending = synchronize { chunks.shift } if sending sending.data.each_with_index do |params, idx| params[:id] = idx.to_s params end send_message_batch_with_retry(entries: sending.data) end end end
Private Instance Methods
calculate_message_size(body, attributes)
click to toggle source
# File lib/sqrewdriver/client.rb, line 133 def calculate_message_size(body, attributes) sum = body.bytesize attributes&.each do |n, a| sum += n.bytesize sum += a[:data_type].bytesize sum += a[:string_value].bytesyze sum += a[:binary_value].bytesyze sum += a[:string_list_values].sum(&:bytesize) sum += a[:binaly_list_values].sum(&:bytesize) end sum end
send_message_batch_with_retry(entries:, retry_count: 0)
click to toggle source
# File lib/sqrewdriver/client.rb, line 146 def send_message_batch_with_retry(entries:, retry_count: 0) resp = @client.send_message_batch(queue_url: @queue_url, entries: entries) unless resp.failed.empty? raise SendMessageBatchFailure.new(resp.failed) if retry_count >= @flush_retry_count failed_entries = resp.failed.map { |f| entries[f.id.to_i] } send_message_batch_with_retry(entries: failed_entries, retry_count: retry_count + 1) end end