class LogStash::Outputs::SumoLogic::MessageQueue

Public Class Methods

new(stats, config) click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 9
def initialize(stats, config)
  @queue_max = (config["queue_max"] ||= 1) < 1 ? 1 : config["queue_max"]
  @queue = SizedQueue::new(@queue_max)
  log_info("initialize memory queue", :max => @queue_max)
  @queue_bytesize = Concurrent::AtomicFixnum.new
  @stats = stats
end

Public Instance Methods

bytesize() click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 52
def bytesize()
  @queue_bytesize.value
end
deq() click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 30
def deq()
  batch = @queue.deq()
  batch_size = batch.payload.bytesize
  @stats.record_deque(batch_size)
  @queue_bytesize.update { |v| v - batch_size }
  log_dbg("dequeue",
    :objects_in_queue => size,
    :bytes_in_queue => @queue_bytesize,
    :size => batch_size)
  batch
end
drain() click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 42
def drain()
  @queue.size.times.map {
    deq()
  }
end
enq(batch) click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 17
def enq(batch)
  batch_size = batch.payload.bytesize
  if (batch_size > 0)
    @queue.enq(batch)
    @stats.record_enque(batch_size)
    @queue_bytesize.update { |v| v + batch_size }
    log_dbg("enqueue",
      :objects_in_queue => size,
      :bytes_in_queue => @queue_bytesize,
      :size => batch_size)
    end
end
size() click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 48
def size()
  @queue.size()
end