class Sqrewdriver::Client

Constants

MAX_BATCH_SIZE
MAX_PAYLOAD_SIZE

Public Class Methods

new(queue_url:, client: nil, threads: 32, serializer: Sqrewdriver.default_serializer, aggregate_messages_per: nil, flush_retry_count: 5, **options) click to toggle source
# File lib/sqrewdriver/client.rb, line 12
def initialize(queue_url:, client: nil, threads: 32, serializer: Sqrewdriver.default_serializer, aggregate_messages_per: nil, flush_retry_count: 5, **options)
  if client
    @client = client
  else
    @client = Aws::SQS::Client.new(options)
  end

  @queue_url = queue_url
  @message_buffer = Concurrent::Array.new
  @thread_pool = Concurrent::FixedThreadPool.new(threads)
  @flush_retry_count = flush_retry_count
  @waiting_futures = Concurrent::Set.new
  @flush_mutex = Mutex.new
  @aggregate_messages_per = aggregate_messages_per

  ensure_serializer_for_aggregation!(serializer)

  @sending_buffer = SendingBuffer.new(client: @client, queue_url: queue_url, serializer: serializer, thread_pool: @thread_pool)
end

Public Instance Methods

flush(timeout = nil) click to toggle source
# File lib/sqrewdriver/client.rb, line 186
def flush(timeout = nil)
  flush_async
  wait_flushing(timeout)
end
flush_async() click to toggle source
# File lib/sqrewdriver/client.rb, line 158
def flush_async
  until @message_buffer.empty? do
    if @aggregate_messages_per
      messages = @message_buffer.shift(@aggregate_messages_per)
      @sending_buffer.add_aggregated_messages(messages)
    else
      message = @message_buffer.shift
      @sending_buffer.add_message(message)
    end

    if @sending_buffer.has_full_chunk?
      send_first_chunk_async
    end
  end

  send_first_chunk_async
end
send_message_buffered(message) click to toggle source

Add a message to buffer.

If count of buffered messages exceed 10 or aggregate_messages_per else if sum of message size exceeds 256KB, send payload to SQS asynchronously.

# File lib/sqrewdriver/client.rb, line 37
def send_message_buffered(message)
  add_message_to_buffer(message)

  if need_flush?
    flush_async
  end
end
wait_flushing(timeout = nil) click to toggle source
# File lib/sqrewdriver/client.rb, line 176
def wait_flushing(timeout = nil)
  zipped = Concurrent::Promises.zip_futures_on(@thread_pool, *@waiting_futures)
  unless zipped.wait(timeout)
    raise Sqrewdriver::SendMessageTimeout
  end

  exceptions = zipped.reason
  raise Sqrewdriver::SendMessageErrors.new(exceptions) if exceptions
end

Private Instance Methods

add_message_to_buffer(message) click to toggle source
# File lib/sqrewdriver/client.rb, line 193
def add_message_to_buffer(message)
  @message_buffer << message
end
ensure_serializer_for_aggregation!(serializer) click to toggle source
# File lib/sqrewdriver/client.rb, line 209
def ensure_serializer_for_aggregation!(serializer)
  valid_serializer = @aggregate_messages_per.nil? || serializer.is_a?(Sqrewdriver::Serdes::JSONSerde)
  unless valid_serializer
    raise InvalidSerializer, "If you use `aggregate_messages_per`, serializer must be `Sqrewdriver::Serdes::JSONSerde`"
  end
end
need_flush?() click to toggle source
# File lib/sqrewdriver/client.rb, line 197
def need_flush?
  @message_buffer.length >= (@aggregate_messages_per&.*(10) || 10)
end
send_first_chunk_async() click to toggle source
# File lib/sqrewdriver/client.rb, line 201
def send_first_chunk_async
  future = @sending_buffer.send_first_chunk_async
  @waiting_futures << future
  future.on_resolution_using(@thread_pool) do |fulfilled, value, reason|
    @waiting_futures.delete(future)
  end
end