class LogStash::Outputs::CloudWatchLogs::Buffer
This class buffers series of single item to batches and puts batches to a SizedQueue for consumption. A buffer includes an ongoing batch and an out queue. An item is added to the ongoing batch first, when the ongoing batch becomes to ready for consumption and then is added to out queue/emptified. An ongoing batch becomes to comsumption ready if the number of items is going to exceed max_batch_count, or the size of items is going to exceed max_batch_size, with the addition of one more item, or the batch has opend more than buffer_duration milliseconds and has at least one item.
Constants
- CLOSE_BATCH
Attributes
Public Class Methods
Creates a new buffer
# File lib/logstash/outputs/cloudwatchlogs.rb, line 288 def initialize(options = {}) @max_batch_count = options.fetch(:max_batch_count) @max_batch_size = options.fetch(:max_batch_size) @buffer_duration = options.fetch(:buffer_duration) @out_queue_size = options.fetch(:out_queue_size, 10) @logger = options.fetch(:logger, nil) @size_of_item_proc = options.fetch(:size_of_item_proc) @in_batch = Array.new @in_count = 0 @in_size = 0 @out_queue = SizedQueue.new(@out_queue_size) @batch_update_mutex = Mutex.new @last_batch_time = Time.now if @buffer_duration > 0 @scheduled_batcher = Thread.new do loop do sleep(@buffer_duration / 1000.0) enq(:scheduled) end end end end
Public Instance Methods
Closes the buffer
Adds current batch to the queue and adds CLOSE_BATCH
to queue. Waits until consumer completes.
# File lib/logstash/outputs/cloudwatchlogs.rb, line 337 def close while @in_size != 0 do enq(:close) sleep(1) end @out_queue.enq(CLOSE_BATCH) end
Deques ready for consumption batches
The caller blocks on this call until the buffer is closed.
# File lib/logstash/outputs/cloudwatchlogs.rb, line 348 def deq(&proc) loop do batch = @out_queue.deq if batch == CLOSE_BATCH break end proc.call(batch) end end
Enques an item to buffer
-
If ongoing batch is not full with this addition, adds item to batch.
-
If ongoing batch is full with this addition, adds item to batch and add batch to out queue.
-
If ongoing batch is going to overflow with this addition, adds batch to out queue,
and then adds the item to the new batch
# File lib/logstash/outputs/cloudwatchlogs.rb, line 317 def enq(item) @batch_update_mutex.synchronize do if item == :scheduled || item == :close add_current_batch_to_out_queue(item) return end status = try_add_item(item) if status != 0 add_current_batch_to_out_queue(:add) if status == -1 try_add_item(item) end end end end
Private Instance Methods
Adds batch to out queue
# File lib/logstash/outputs/cloudwatchlogs.rb, line 385 def add_current_batch_to_out_queue(from) if from == :scheduled && (Time.now - @last_batch_time) * 1000 < @buffer_duration return end if @in_batch.size == 0 @last_batch_time = Time.now return end @logger.debug("Added batch with #{in_count} items in #{in_size} by #{from}") if @logger @out_queue.enq(@in_batch) @in_batch = Array.new @in_count = 0 @in_size = 0 @last_batch_time = Time.now end
Adds item to batch
# File lib/logstash/outputs/cloudwatchlogs.rb, line 378 def add_item(item) @in_batch << item @in_count += 1 @in_size += @size_of_item_proc.call(item) end
Tries to add an item to buffer
# File lib/logstash/outputs/cloudwatchlogs.rb, line 360 def try_add_item(item) item_size = @size_of_item_proc.call(item) if @in_count + 1 == @max_batch_count || @in_size + item_size == @max_batch_size # accept item, but can't accept more items add_item(item) return 1 elsif @in_size + item_size > @max_batch_size # cannot accept item return -1 else add_item(item) # accept item, and may accept next item return 0 end end