class LogStash::Outputs::SumoLogic::MessageQueue
Public Class Methods
new(stats, config)
click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 9 def initialize(stats, config) @queue_max = (config["queue_max"] ||= 1) < 1 ? 1 : config["queue_max"] @queue = SizedQueue::new(@queue_max) log_info("initialize memory queue", :max => @queue_max) @queue_bytesize = Concurrent::AtomicFixnum.new @stats = stats end
Public Instance Methods
bytesize()
click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 52 def bytesize() @queue_bytesize.value end
deq()
click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 30 def deq() batch = @queue.deq() batch_size = batch.payload.bytesize @stats.record_deque(batch_size) @queue_bytesize.update { |v| v - batch_size } log_dbg("dequeue", :objects_in_queue => size, :bytes_in_queue => @queue_bytesize, :size => batch_size) batch end
drain()
click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 42 def drain() @queue.size.times.map { deq() } end
enq(batch)
click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 17 def enq(batch) batch_size = batch.payload.bytesize if (batch_size > 0) @queue.enq(batch) @stats.record_enque(batch_size) @queue_bytesize.update { |v| v + batch_size } log_dbg("enqueue", :objects_in_queue => size, :bytes_in_queue => @queue_bytesize, :size => batch_size) end end
size()
click to toggle source
# File lib/logstash/outputs/sumologic/message_queue.rb, line 48 def size() @queue.size() end