class Sqrewdriver::Client
Constants
- MAX_BATCH_SIZE
- MAX_PAYLOAD_SIZE
Public Class Methods
new(queue_url:, client: nil, threads: 32, serializer: Sqrewdriver.default_serializer, aggregate_messages_per: nil, flush_retry_count: 5, **options)
click to toggle source
# File lib/sqrewdriver/client.rb, line 12 def initialize(queue_url:, client: nil, threads: 32, serializer: Sqrewdriver.default_serializer, aggregate_messages_per: nil, flush_retry_count: 5, **options) if client @client = client else @client = Aws::SQS::Client.new(options) end @queue_url = queue_url @message_buffer = Concurrent::Array.new @thread_pool = Concurrent::FixedThreadPool.new(threads) @flush_retry_count = flush_retry_count @waiting_futures = Concurrent::Set.new @flush_mutex = Mutex.new @aggregate_messages_per = aggregate_messages_per ensure_serializer_for_aggregation!(serializer) @sending_buffer = SendingBuffer.new(client: @client, queue_url: queue_url, serializer: serializer, thread_pool: @thread_pool) end
Public Instance Methods
flush(timeout = nil)
click to toggle source
# File lib/sqrewdriver/client.rb, line 186 def flush(timeout = nil) flush_async wait_flushing(timeout) end
flush_async()
click to toggle source
# File lib/sqrewdriver/client.rb, line 158 def flush_async until @message_buffer.empty? do if @aggregate_messages_per messages = @message_buffer.shift(@aggregate_messages_per) @sending_buffer.add_aggregated_messages(messages) else message = @message_buffer.shift @sending_buffer.add_message(message) end if @sending_buffer.has_full_chunk? send_first_chunk_async end end send_first_chunk_async end
send_message_buffered(message)
click to toggle source
Add a message to buffer.
If count of buffered messages exceed 10 or aggregate_messages_per else if sum of message size exceeds 256KB, send payload to SQS asynchronously.
# File lib/sqrewdriver/client.rb, line 37 def send_message_buffered(message) add_message_to_buffer(message) if need_flush? flush_async end end
wait_flushing(timeout = nil)
click to toggle source
# File lib/sqrewdriver/client.rb, line 176 def wait_flushing(timeout = nil) zipped = Concurrent::Promises.zip_futures_on(@thread_pool, *@waiting_futures) unless zipped.wait(timeout) raise Sqrewdriver::SendMessageTimeout end exceptions = zipped.reason raise Sqrewdriver::SendMessageErrors.new(exceptions) if exceptions end
Private Instance Methods
add_message_to_buffer(message)
click to toggle source
# File lib/sqrewdriver/client.rb, line 193 def add_message_to_buffer(message) @message_buffer << message end
ensure_serializer_for_aggregation!(serializer)
click to toggle source
# File lib/sqrewdriver/client.rb, line 209 def ensure_serializer_for_aggregation!(serializer) valid_serializer = @aggregate_messages_per.nil? || serializer.is_a?(Sqrewdriver::Serdes::JSONSerde) unless valid_serializer raise InvalidSerializer, "If you use `aggregate_messages_per`, serializer must be `Sqrewdriver::Serdes::JSONSerde`" end end
need_flush?()
click to toggle source
# File lib/sqrewdriver/client.rb, line 197 def need_flush? @message_buffer.length >= (@aggregate_messages_per&.*(10) || 10) end
send_first_chunk_async()
click to toggle source
# File lib/sqrewdriver/client.rb, line 201 def send_first_chunk_async future = @sending_buffer.send_first_chunk_async @waiting_futures << future future.on_resolution_using(@thread_pool) do |fulfilled, value, reason| @waiting_futures.delete(future) end end