class Fluent::BaritoBatchK8sOutput

Constants

PLUGIN_NAME

Public Instance Methods

clean_attribute(record) click to toggle source
# File lib/fluent/plugin/out_barito_batch_k8s.rb, line 72
def clean_attribute(record)
  # Delete kubernetes & docker field
  record.delete('kubernetes')
  record.delete('docker')
  record
end
format(tag, time, record) click to toggle source

Overide from BufferedOutput

# File lib/fluent/plugin/out_barito_batch_k8s.rb, line 25
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
merge_log_attribute(record) click to toggle source
# File lib/fluent/plugin/out_barito_batch_k8s.rb, line 79
def merge_log_attribute(record)
  message_log = nil
  begin
    message_log = JSON.parse(record['log'])
  rescue
  end

  if !message_log.nil?
    return record.merge(message_log)
  end

  record
end
start() click to toggle source

Overide from BufferedOutput

Calls superclass method
# File lib/fluent/plugin/out_barito_batch_k8s.rb, line 20
def start
  super
end
write(chunk) click to toggle source

Overide from BufferedOutput

# File lib/fluent/plugin/out_barito_batch_k8s.rb, line 30
def write(chunk)
  data = {
    'items' => []
  }

  transport = Fluent::Plugin::BaritoTransport.new(@produce_url, log)
  chunk.msgpack_each do |tag, time, record|

    # Kubernetes annotations
    k8s_metadata = record['kubernetes']

    record = clean_attribute(record)
    trail = Fluent::Plugin::ClientTrail.new(true)
    timber = Fluent::Plugin::TimberFactory::create_timber(tag, time, record, trail)
    new_timber = merge_log_attribute(timber)

    # Add kubernetes information
    new_timber['k8s_metadata'] = {
      'pod_name' => k8s_metadata['pod_name'],
      'namespace_name' => k8s_metadata['namespace_name'],
      'container_name' => k8s_metadata['container_name'],
      'host' => k8s_metadata['host'],
      'cluster_name' => @cluster_name
    }

    data['items'] << new_timber
  end

  if @application_secret.nil? or @application_secret.empty?
      return if @application_group_secret.nil? or @application_name.nil?
      header = {
        content_type: :json,
        'X-App-Group-Secret' => @application_group_secret,
        'X-App-Name' => @application_name
      }
  else
    header = {content_type: :json, 'X-App-Secret' => @application_secret}
  end

  transport.send(data, header)
end