class Sqrewdriver::Client::SendingBuffer

Attributes

chunks[R]

Public Class Methods

new(client:, queue_url:, serializer:, thread_pool:) click to toggle source
Calls superclass method
# File lib/sqrewdriver/client.rb, line 68
def initialize(client:, queue_url:, serializer:, thread_pool:)
  super()
  @client = client
  @queue_url = queue_url
  @chunks = Concurrent::Array.new
  @serializer = serializer
  @thread_pool = thread_pool
end

Public Instance Methods

add_aggregated_messages(messages) click to toggle source
# File lib/sqrewdriver/client.rb, line 94
def add_aggregated_messages(messages)
  base_message = messages[0]

  message_bodies = messages.map { |m| m[:message_body] }
  serialized = @serializer.dump(message_bodies)
  base_message[:message_body] = serialized
  add_size = calculate_message_size(serialized, base_message[:message_attributes])

  synchronize do
    @chunks << Chunk.new if @chunks.empty?
    if @chunks.last.size == MAX_BATCH_SIZE || @chunks.last.bytesize + add_size > MAX_PAYLOAD_SIZE
      new_chunk = Chunk.new
      new_chunk.add(base_message, add_size)
      @chunks << new_chunk
    else
      @chunks.last.add(base_message, add_size)
    end
  end
end
add_message(message) click to toggle source
# File lib/sqrewdriver/client.rb, line 77
def add_message(message)
  serialized = @serializer.dump(message[:message_body])
  message[:message_body] = serialized
  add_size = calculate_message_size(serialized, message[:attributes])

  synchronize do
    @chunks << Chunk.new if @chunks.empty?
    if @chunks.last.size == MAX_BATCH_SIZE || @chunks.last.bytesize + add_size > MAX_PAYLOAD_SIZE
      new_chunk = Chunk.new
      new_chunk.add(message, add_size)
      @chunks << new_chunk
    else
      @chunks.last.add(message, add_size)
    end
  end
end
has_full_chunk?() click to toggle source
# File lib/sqrewdriver/client.rb, line 114
def has_full_chunk?
  @chunks.size > 1
end
send_first_chunk_async() click to toggle source
# File lib/sqrewdriver/client.rb, line 118
def send_first_chunk_async
  Concurrent::Promises.future_on(@thread_pool, @chunks) do |chunks|
    sending = synchronize { chunks.shift }
    if sending
      sending.data.each_with_index do |params, idx|
        params[:id] = idx.to_s
        params
      end
      send_message_batch_with_retry(entries: sending.data)
    end
  end
end

Private Instance Methods

calculate_message_size(body, attributes) click to toggle source
# File lib/sqrewdriver/client.rb, line 133
def calculate_message_size(body, attributes)
  sum = body.bytesize
  attributes&.each do |n, a|
    sum += n.bytesize
    sum += a[:data_type].bytesize
    sum += a[:string_value].bytesyze
    sum += a[:binary_value].bytesyze
    sum += a[:string_list_values].sum(&:bytesize)
    sum += a[:binaly_list_values].sum(&:bytesize)
  end
  sum
end
send_message_batch_with_retry(entries:, retry_count: 0) click to toggle source
# File lib/sqrewdriver/client.rb, line 146
def send_message_batch_with_retry(entries:, retry_count: 0)
  resp = @client.send_message_batch(queue_url: @queue_url, entries: entries)

  unless resp.failed.empty?
    raise SendMessageBatchFailure.new(resp.failed) if retry_count >= @flush_retry_count

    failed_entries = resp.failed.map { |f| entries[f.id.to_i] }
    send_message_batch_with_retry(entries: failed_entries, retry_count: retry_count + 1)
  end
end