class Fluent::BaritoK8sOutput

Constants

LABEL_APP_GROUP_SECRET
LABEL_APP_NAME
LABEL_APP_SECRET
LABEL_PRODUCE_URL
PLUGIN_NAME

Public Instance Methods

application_group_secret(params) click to toggle source
# File lib/fluent/plugin/out_barito_k8s.rb, line 81
def application_group_secret(params)
  params[LABEL_APP_GROUP_SECRET]
end
application_name(params) click to toggle source
# File lib/fluent/plugin/out_barito_k8s.rb, line 85
def application_name(params)
  params[LABEL_APP_NAME]
end
application_secret(params) click to toggle source
# File lib/fluent/plugin/out_barito_k8s.rb, line 77
def application_secret(params)
  params[LABEL_APP_SECRET]
end
clean_attribute(record) click to toggle source
# File lib/fluent/plugin/out_barito_k8s.rb, line 89
def clean_attribute(record)
  # Delete kubernetes & docker field
  record.delete('kubernetes')
  record.delete('docker')
  record
end
format(tag, time, record) click to toggle source

Overide from BufferedOutput

# File lib/fluent/plugin/out_barito_k8s.rb, line 23
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
merge_log_attribute(record) click to toggle source
# File lib/fluent/plugin/out_barito_k8s.rb, line 96
def merge_log_attribute(record)
  message_log = nil
  begin
    message_log = JSON.parse(record['log'])
  rescue
  end

  if !message_log.nil?
    return record.merge(message_log)
  end

  record
end
produce_url(params) click to toggle source
# File lib/fluent/plugin/out_barito_k8s.rb, line 73
def produce_url(params)
  params[LABEL_PRODUCE_URL]
end
start() click to toggle source

Overide from BufferedOutput

Calls superclass method
# File lib/fluent/plugin/out_barito_k8s.rb, line 18
def start
  super
end
write(chunk) click to toggle source

Overide from BufferedOutput

# File lib/fluent/plugin/out_barito_k8s.rb, line 28
def write(chunk)
  chunk.msgpack_each do |tag, time, record|

    # Kubernetes annotations
    k8s_metadata = record['kubernetes']
    params = k8s_metadata['annotations']

    # Skip record if no annotations found
    next if params.nil?
    url = produce_url(params)
    app_secret = application_secret(params)
    app_group_secret = application_group_secret(params)
    app_name = application_name(params)

    next if url.nil?

    if app_secret.nil?
      next if app_group_secret.nil? or app_name.nil?
      header = {
        content_type: :json,
        'X-App-Group-Secret' => app_group_secret,
        'X-App-Name' => app_name
      }
    else
      header = {content_type: :json, 'X-App-Secret' => app_secret}
    end

    record = clean_attribute(record)
    transport = Fluent::Plugin::BaritoTransport.new(url, log)
    trail = Fluent::Plugin::ClientTrail.new(true)
    timber = Fluent::Plugin::TimberFactory::create_timber(tag, time, record, trail)
    new_timber = merge_log_attribute(timber)

    # Add kubernetes information
    new_timber['k8s_metadata'] = {
      'pod_name' => k8s_metadata['pod_name'],
      'namespace_name' => k8s_metadata['namespace_name'],
      'container_name' => k8s_metadata['container_name'],
      'host' => k8s_metadata['host']
    }

    transport.send(new_timber, header)
  end
end