class LogStash::Outputs::ElasticSearch::Buffer
Public Class Methods
new(logger, max_size, flush_interval, &block)
click to toggle source
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 6 def initialize(logger, max_size, flush_interval, &block) @logger = logger # You need to aquire this for anything modifying state generally @operations_mutex = Mutex.new @operations_lock = java.util.concurrent.locks.ReentrantLock.new @stopping = Concurrent::AtomicBoolean.new(false) @max_size = max_size @submit_proc = block @buffer = [] @last_flush = Time.now @flush_interval = flush_interval @flush_thread = spawn_interval_flusher end
Public Instance Methods
contents()
click to toggle source
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 55 def contents synchronize {|buffer| buffer} end
flush()
click to toggle source
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 38 def flush synchronize { flush_unsafe } end
push(item)
click to toggle source
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 23 def push(item) synchronize do |buffer| push_unsafe(item) end end
Also aliased as: <<
push_multi(items)
click to toggle source
Push multiple items onto the buffer in a single operation
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 31 def push_multi(items) raise ArgumentError, "push multi takes an array!, not an #{items.class}!" unless items.is_a?(Array) synchronize do |buffer| items.each {|item| push_unsafe(item) } end end
stop(do_flush=true,wait_complete=true)
click to toggle source
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 42 def stop(do_flush=true,wait_complete=true) return if stopping? @stopping.make_true # No need to acquire a lock in this case return if !do_flush && !wait_complete synchronize do flush_unsafe if do_flush @flush_thread.join if wait_complete end end
synchronize() { |buffer| ... }
click to toggle source
For externally operating on the buffer contents this takes a block and will yield the internal buffer and executes the block in a synchronized block from the internal mutex
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 62 def synchronize @operations_mutex.synchronize { yield(@buffer) } end
Private Instance Methods
flush_unsafe()
click to toggle source
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 107 def flush_unsafe if @buffer.size > 0 @submit_proc.call(@buffer) @buffer.clear end @last_flush = Time.now # This must always be set to ensure correct timer behavior end
interval_flush()
click to toggle source
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 87 def interval_flush if last_flush_seconds_ago >= @flush_interval begin @logger.debug? && @logger.debug("Flushing buffer at interval", :instance => self.inspect, :interval => @flush_interval) flush_unsafe rescue StandardError => e @logger.warn("Error flushing buffer at interval!", :instance => self.inspect, :message => e.message, :class => e.class.name, :backtrace => e.backtrace ) rescue Exception => e @logger.warn("Exception flushing buffer at interval!", :error => e.message, :class => e.class.name) end end end
last_flush_seconds_ago()
click to toggle source
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 116 def last_flush_seconds_ago Time.now - @last_flush end
push_unsafe(item)
click to toggle source
These methods are private for various reasons, chief among them threadsafety! Many require the @operations_mutex to be locked to be safe
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 70 def push_unsafe(item) @buffer << item if @buffer.size >= @max_size flush_unsafe end end
spawn_interval_flusher()
click to toggle source
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 77 def spawn_interval_flusher Thread.new do loop do sleep 0.2 break if stopping? synchronize { interval_flush } end end end
stopping?()
click to toggle source
# File lib/logstash/outputs/elasticsearch/buffer.rb, line 120 def stopping? @stopping.true? end