class LogStash::Outputs::CloudWatchLogs::Buffer

This class buffers series of single item to batches and puts batches to a SizedQueue for consumption. A buffer includes an ongoing batch and an out queue. An item is added to the ongoing batch first, when the ongoing batch becomes to ready for consumption and then is added to out queue/emptified. An ongoing batch becomes to comsumption ready if the number of items is going to exceed max_batch_count, or the size of items is going to exceed max_batch_size, with the addition of one more item, or the batch has opend more than buffer_duration milliseconds and has at least one item.

Constants

CLOSE_BATCH

Attributes

in_batch[R]
in_count[R]
in_size[R]
out_queue[R]

Public Class Methods

new(options = {}) click to toggle source

Creates a new buffer

# File lib/logstash/outputs/cloudwatchlogs.rb, line 272
def initialize(options = {})
  @max_batch_count = options.fetch(:max_batch_count)
  @max_batch_size = options.fetch(:max_batch_size)
  @buffer_duration = options.fetch(:buffer_duration)
  @out_queue_size = options.fetch(:out_queue_size, 10)
  @logger = options.fetch(:logger, nil)
  @size_of_item_proc = options.fetch(:size_of_item_proc)
  @in_batch = Array.new
  @in_count = 0
  @in_size = 0
  @out_queue = SizedQueue.new(@out_queue_size)
  @batch_update_mutex = Mutex.new
  @last_batch_time = Time.now
  if @buffer_duration > 0
    @scheduled_batcher = Thread.new do
      loop do
        sleep(@buffer_duration / 1000.0)
        enq(:scheduled)
      end
    end
  end
end

Public Instance Methods

close() click to toggle source

Closes the buffer

Adds current batch to the queue and adds CLOSE_BATCH to queue. Waits until consumer completes.

# File lib/logstash/outputs/cloudwatchlogs.rb, line 321
def close
  while @in_size != 0 do
    enq(:close)
    sleep(1)
  end
  @out_queue.enq(CLOSE_BATCH)
end
deq(&proc) click to toggle source

Deques ready for consumption batches

The caller blocks on this call until the buffer is closed.

# File lib/logstash/outputs/cloudwatchlogs.rb, line 332
def deq(&proc)
  loop do
    batch = @out_queue.deq
    if batch == CLOSE_BATCH
      break
    end
    proc.call(batch)
  end
end
enq(item) click to toggle source

Enques an item to buffer

  • If ongoing batch is not full with this addition, adds item to batch.

  • If ongoing batch is full with this addition, adds item to batch and add batch to out queue.

  • If ongoing batch is going to overflow with this addition, adds batch to out queue,

and then adds the item to the new batch

# File lib/logstash/outputs/cloudwatchlogs.rb, line 301
def enq(item)
  @batch_update_mutex.synchronize do
    if item == :scheduled || item == :close
      add_current_batch_to_out_queue(item)
      return
    end
    status = try_add_item(item)
    if status != 0
      add_current_batch_to_out_queue(:add)
      if status == -1
        try_add_item(item)
      end
    end
  end
end

Private Instance Methods

add_current_batch_to_out_queue(from) click to toggle source

Adds batch to out queue

# File lib/logstash/outputs/cloudwatchlogs.rb, line 369
def add_current_batch_to_out_queue(from)
  if from == :scheduled && (Time.now - @last_batch_time) * 1000 < @buffer_duration
    return
  end
  if @in_batch.size == 0
    @last_batch_time = Time.now
    return
  end
  @logger.debug("Added batch with #{in_count} items in #{in_size} by #{from}") if @logger
  @out_queue.enq(@in_batch)
  @in_batch = Array.new
  @in_count = 0
  @in_size = 0
  @last_batch_time = Time.now
end
add_item(item) click to toggle source

Adds item to batch

# File lib/logstash/outputs/cloudwatchlogs.rb, line 362
def add_item(item)
  @in_batch << item
  @in_count += 1
  @in_size += @size_of_item_proc.call(item)
end
try_add_item(item) click to toggle source

Tries to add an item to buffer

# File lib/logstash/outputs/cloudwatchlogs.rb, line 344
def try_add_item(item)
  item_size = @size_of_item_proc.call(item)
  if @in_count + 1 == @max_batch_count ||
    @in_size + item_size == @max_batch_size
    # accept item, but can't accept more items
    add_item(item)
    return 1
  elsif @in_size + item_size > @max_batch_size
    # cannot accept item
    return -1
  else
    add_item(item)
    # accept item, and may accept next item
    return 0
  end
end