class LogStash::Outputs::SumoLogic::Piler
Attributes
is_pile[R]
Public Class Methods
new(queue, stats, config)
click to toggle source
# File lib/logstash/outputs/sumologic/piler.rb, line 13 def initialize(queue, stats, config) @interval = config["interval"] ||= 0 @pile_max = config["pile_max"] ||= 0 @queue = queue @stats = stats @stopping = Concurrent::AtomicBoolean.new(false) @payload_builder = PayloadBuilder.new(@stats, config) @header_builder = HeaderBuilder.new(config) @is_pile = (@interval > 0 && @pile_max > 0) if (@is_pile) @pile = Hash.new("") @semaphore = Mutex.new end end
Public Instance Methods
input(event)
click to toggle source
# File lib/logstash/outputs/sumologic/piler.rb, line 55 def input(event) if (@stopping.true?) log_warn("piler is shutting down, event is dropped", "event" => event) else headers = @header_builder.build(event) payload = @payload_builder.build(event) if (@is_pile) @semaphore.synchronize { content = @pile[headers] size = content.bytesize if size + payload.bytesize > @pile_max @queue.enq(Batch.new(headers, content)) @pile[headers] = "" end @pile[headers] = @pile[headers].blank? ? payload : "#{@pile[headers]}\n#{payload}" } else @queue.enq(Batch.new(headers, payload)) end # if end end
start()
click to toggle source
# File lib/logstash/outputs/sumologic/piler.rb, line 30 def start() @stopping.make_false() if (@is_pile) log_info("starting piler...", :max => @pile_max, :timeout => @interval) @piler_t = Thread.new { while @stopping.false? Stud.stoppable_sleep(@interval) { @stopping.true? } log_dbg("timeout", :timeout => @interval) enq_and_clear() end # while } end # if end
stop()
click to toggle source
# File lib/logstash/outputs/sumologic/piler.rb, line 46 def stop() @stopping.make_true() if (@is_pile) log_info("shutting down piler in #{@interval * 2} secs ...") @piler_t.join(@interval * 2) log_info("piler is fully shutted down") end end
Private Instance Methods
enq_and_clear()
click to toggle source
# File lib/logstash/outputs/sumologic/piler.rb, line 79 def enq_and_clear() @semaphore.synchronize { @pile.each do |headers, content| @queue.enq(Batch.new(headers, content)) end @pile.clear() } end