class LogStash::Outputs::Loki

Attributes

batch[R]

Public Instance Methods

add_entry_to_batch(e) click to toggle source

Add an entry to the current batch returns false if the batch is full and the entry can't be added.

# File lib/logstash/outputs/loki.rb, line 177
def add_entry_to_batch(e)
  line = e.entry['line']
  # we don't want to send empty lines.
  return true if line.to_s.strip.empty?

  if @batch.nil?
    @batch = Batch.new(e)
    return true
  end

  if @batch.size_bytes_after(line) > @batch_size
    return false
  end
  @batch.add(e)
  return true
end
close() click to toggle source
# File lib/logstash/outputs/loki.rb, line 204
def close
  @entries.close
  @mutex.synchronize do 
    @stop = true 
  end
  @batch_wait_thread.join
  @batch_size_thread.join

  # if by any chance we still have a forming batch, we need to send it.
  send(@batch) if !@batch.nil?
  @batch = nil
end
is_batch_expired() click to toggle source
# File lib/logstash/outputs/loki.rb, line 194
def is_batch_expired
  return !@batch.nil? && @batch.age() >= @batch_wait
end
load_ssl() click to toggle source
# File lib/logstash/outputs/loki.rb, line 136
def load_ssl
  @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
  @key = OpenSSL::PKey.read(File.read(@key)) if @key
end
loki_http_request(payload) click to toggle source
# File lib/logstash/outputs/loki.rb, line 227
def loki_http_request(payload)
  req = Net::HTTP::Post.new(
    @uri.request_uri
  )
  req.add_field('Content-Type', 'application/json')
  req.add_field('X-Scope-OrgID', @tenant_id) if @tenant_id
  req['User-Agent']= 'loki-logstash'
  req.basic_auth(@username, @password) if @username
  req.body = payload

  opts = ssl_opts(@uri)

  @logger.debug("sending #{req.body.length} bytes to loki")
  retry_count = 0
  delay = @min_delay
  begin
    res = Net::HTTP.start(@uri.host, @uri.port, **opts) { |http|
      http.request(req)
    }
    return res if !res.nil? && res.code.to_i != 429 && res.code.to_i.div(100) != 5
    raise StandardError.new res
  rescue StandardError => e
    retry_count += 1
    @logger.warn("Failed to send batch, attempt: #{retry_count}/#{@retries}", :error_inspect => e.inspect, :error => e)
    if retry_count < @retries
      sleep delay
      if delay * 2 <= @max_delay
        delay = delay * 2
      else
        delay = @max_delay
      end
      retry
    else
      @logger.error("Failed to send batch", :error_inspect => e.inspect, :error => e)
      return res
    end
  end
end
max_batch_size() click to toggle source
# File lib/logstash/outputs/loki.rb, line 89
def max_batch_size
  loop do
    @mutex.synchronize do
      return if @stop
    end

    e = @entries.deq
    return if e.nil?

    @mutex.synchronize do
      if !add_entry_to_batch(e)
        @logger.debug("Max batch_size is reached. Sending batch to loki")
        send(@batch)
        @batch = Batch.new(e)
      end
    end
  end
end
max_batch_wait() click to toggle source
# File lib/logstash/outputs/loki.rb, line 108
def max_batch_wait
  # minimum wait frequency is 10 milliseconds
        min_wait_checkfrequency = 1/100
        max_wait_checkfrequency = @batch_wait
        if max_wait_checkfrequency < min_wait_checkfrequency
                max_wait_checkfrequency = min_wait_checkfrequency
  end

  loop do
    @mutex.synchronize do
      return if @stop
    end

    sleep(max_wait_checkfrequency)
    if is_batch_expired
      @mutex.synchronize do
        @logger.debug("Max batch_wait time is reached. Sending batch to loki")
        send(@batch)
        @batch = nil
      end
    end
  end
end
receive(event) click to toggle source

Receives logstash events

# File lib/logstash/outputs/loki.rb, line 200
def receive(event)
  @entries << Entry.new(event, @message_field)
end
register() click to toggle source
# File lib/logstash/outputs/loki.rb, line 58
def register
  @uri = URI.parse(@url)
  unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS)
    raise LogStash::ConfigurationError, "url parameter must be valid HTTP, currently '#{@url}'"
  end

  if @min_delay > @max_delay
    raise LogStash::ConfigurationError, "Min delay should be less than Max delay, currently 'Min delay is #{@min_delay} and Max delay is #{@max_delay}'"
  end

  @logger.info("Loki output plugin", :class => self.class.name)

  # initialize Queue and Mutex
  @entries = Queue.new 
  @mutex = Mutex.new
  @stop = false

  # create nil batch object.
  @batch = nil

  # validate certs
  if ssl_cert?
    load_ssl
    validate_ssl_key
  end

  # start batch_max_wait and batch_max_size threads
  @batch_wait_thread = Thread.new{max_batch_wait()}
  @batch_size_thread = Thread.new{max_batch_size()}
end
send(batch) click to toggle source
# File lib/logstash/outputs/loki.rb, line 217
def send(batch)
  payload = batch.to_json
  res = loki_http_request(payload)
  if res.is_a?(Net::HTTPSuccess)
    @logger.debug("Successfully pushed data to loki")
  else
    @logger.debug("failed payload", :payload => payload)
  end
end
ssl_cert?() click to toggle source
# File lib/logstash/outputs/loki.rb, line 132
def ssl_cert?
  !@key.nil? && !@cert.nil?
end
ssl_opts(uri) click to toggle source
# File lib/logstash/outputs/loki.rb, line 147
def ssl_opts(uri)
  opts = {
    use_ssl: uri.scheme == 'https'
  }

   # disable server certificate verification
  if @insecure_skip_verify
    opts = opts.merge(
      verify_mode: OpenSSL::SSL::VERIFY_NONE
    )
  end

  if !@cert.nil? && !@key.nil?
    opts = opts.merge(
      verify_mode: OpenSSL::SSL::VERIFY_PEER,
      cert: @cert,
      key: @key
    )
  end

  unless @ca_cert.nil?
    opts = opts.merge(
      ca_file: @ca_cert
    )
  end
  opts
end
validate_ssl_key() click to toggle source
# File lib/logstash/outputs/loki.rb, line 141
def validate_ssl_key
  if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA)
    raise LogStash::ConfigurationError, "Unsupported private key type '#{@key.class}''"
  end
end