class LogStash::Outputs::Loki
Attributes
batches[R]
Public Instance Methods
add_entry_to_batch(e, tenant = 'default')
click to toggle source
Add an entry to the current batch returns false if the batch is full and the entry can't be added.
# File lib/logstash/outputs/loki.rb, line 193 def add_entry_to_batch(e, tenant = 'default') line = e.entry['line'] # we don't want to send empty lines. return true if line.to_s.strip.empty? tenant = 'default' if tenant.nil? or tenant.empty? if @batches.nil? @batches = Hash.new end if !@batches.key?(tenant) @batches[tenant] = Batch.new(e) return true end if @batches[tenant].size_bytes_after(line) > @batch_size return false end @batches[tenant].add(e) return true end
batch(tenant = 'default')
click to toggle source
# File lib/logstash/outputs/loki.rb, line 91 def batch(tenant = 'default') return nil if @batches.nil? return @batches[tenant] if !tenant.nil? && !tenant.empty? && @batches.key?(tenant) return @batches['default'] if @batches.key?('default') return nil end
close()
click to toggle source
# File lib/logstash/outputs/loki.rb, line 232 def close @entries.close @mutex.synchronize do @stop = true end @batch_wait_thread.join @batch_size_thread.join # if by any chance we still have a forming batch, we need to send it. @batches.keys.each { |tenant| send_batch_for_tenant(tenant) } @batches.clear() @batches = nil end
is_batch_expired(tenant = 'default')
click to toggle source
# File lib/logstash/outputs/loki.rb, line 217 def is_batch_expired(tenant = 'default') tenant = 'default' if tenant.nil? or tenant.empty? return !@batches.nil? && @batches.key?(tenant) && @batches[tenant].age() >= @batch_wait end
load_ssl()
click to toggle source
# File lib/logstash/outputs/loki.rb, line 152 def load_ssl @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert @key = OpenSSL::PKey.read(File.read(@key)) if @key end
loki_http_request(payload, tenant = 'default')
click to toggle source
# File lib/logstash/outputs/loki.rb, line 258 def loki_http_request(payload, tenant = 'default') req = Net::HTTP::Post.new( @uri.request_uri ) req.add_field('Content-Type', 'application/json') if !tenant.nil? && !tenant.empty? && !tenant.eql?('default') req.add_field('X-Scope-OrgID', tenant) elsif !@tenant_id.nil? && !@tenant_id.empty? req.add_field('X-Scope-OrgID', @tenant_id) end req['User-Agent']= 'loki-logstash' req.basic_auth(@username, @password) if @username req.body = payload opts = ssl_opts(@uri) @logger.debug("sending #{req.body.length} bytes to loki. tenant #{tenant}") retry_count = 0 delay = @min_delay begin res = Net::HTTP.start(@uri.host, @uri.port, **opts) { |http| http.request(req) } return res if !res.nil? && res.code.to_i != 429 && res.code.to_i.div(100) != 5 raise StandardError.new res rescue StandardError => e retry_count += 1 @logger.warn("Failed to send batch, attempt: #{retry_count}/#{@retries}", :error_inspect => e.inspect, :error => e) if retry_count < @retries sleep delay if delay * 2 <= @max_delay delay = delay * 2 else delay = @max_delay end retry else @logger.error("Failed to send batch", :error_inspect => e.inspect, :error => e) return res end end end
max_batch_size()
click to toggle source
# File lib/logstash/outputs/loki.rb, line 98 def max_batch_size loop do @mutex.synchronize do return if @stop end e = @entries.deq return if e.nil? tenant = nil tenant = e.labels['tenant'] if !e.labels.nil? && e.labels.key?('tenant') tenant = 'default' if tenant.nil? or tenant.empty? @mutex.synchronize do if !add_entry_to_batch(e, tenant) @logger.debug("Max batch_size is reached. Sending batch to loki. Tenant #{tenant}") send_batch_for_tenant(tenant) @batches[tenant] = Batch.new(e) end end end end
max_batch_wait()
click to toggle source
# File lib/logstash/outputs/loki.rb, line 121 def max_batch_wait # minimum wait frequency is 10 milliseconds min_wait_checkfrequency = 1/100 max_wait_checkfrequency = @batch_wait if max_wait_checkfrequency < min_wait_checkfrequency max_wait_checkfrequency = min_wait_checkfrequency end loop do @mutex.synchronize do return if @stop end sleep(max_wait_checkfrequency) @mutex.synchronize do @batches.keys.clone.each { |tenant| if is_batch_expired(tenant) @logger.debug("Max batch_wait time is reached. Sending batch to loki. Tenant #{tenant}") send_batch_for_tenant(tenant) @batches.delete(tenant) end } end end end
receive(event)
click to toggle source
Receives logstash events
# File lib/logstash/outputs/loki.rb, line 228 def receive(event) @entries << Entry.new(event, @message_field) end
register()
click to toggle source
# File lib/logstash/outputs/loki.rb, line 58 def register @uri = URI.parse(@url) unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS) raise LogStash::ConfigurationError, "url parameter must be valid HTTP, currently '#{@url}'" end if @min_delay > @max_delay raise LogStash::ConfigurationError, "Min delay should be less than Max delay, currently 'Min delay is #{@min_delay} and Max delay is #{@max_delay}'" end @logger.info("Loki output plugin", :class => self.class.name) # initialize Queue and Mutex @entries = Queue.new @mutex = Mutex.new @stop = false # create nil batch object. @batches = Hash.new # validate certs if ssl_cert? load_ssl validate_ssl_key end # start batch_max_wait and batch_max_size threads @batch_wait_thread = Thread.new{max_batch_wait()} @batch_size_thread = Thread.new{max_batch_size()} end
send(batch, tenant = 'default')
click to toggle source
# File lib/logstash/outputs/loki.rb, line 248 def send(batch, tenant = 'default') payload = batch.to_json res = loki_http_request(payload, tenant) if res.is_a?(Net::HTTPSuccess) @logger.debug("Successfully pushed data to loki") else @logger.debug("failed payload", :payload => payload) end end
send_batch_for_tenant(tenant)
click to toggle source
# File lib/logstash/outputs/loki.rb, line 222 def send_batch_for_tenant(tenant) send(batch(tenant), tenant) end
ssl_cert?()
click to toggle source
# File lib/logstash/outputs/loki.rb, line 148 def ssl_cert? !@key.nil? && !@cert.nil? end
ssl_opts(uri)
click to toggle source
# File lib/logstash/outputs/loki.rb, line 163 def ssl_opts(uri) opts = { use_ssl: uri.scheme == 'https' } # disable server certificate verification if @insecure_skip_verify opts = opts.merge( verify_mode: OpenSSL::SSL::VERIFY_NONE ) end if !@cert.nil? && !@key.nil? opts = opts.merge( verify_mode: OpenSSL::SSL::VERIFY_PEER, cert: @cert, key: @key ) end unless @ca_cert.nil? opts = opts.merge( ca_file: @ca_cert ) end opts end
validate_ssl_key()
click to toggle source
# File lib/logstash/outputs/loki.rb, line 157 def validate_ssl_key if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA) raise LogStash::ConfigurationError, "Unsupported private key type '#{@key.class}''" end end