class Fluent::Plugin::Sumologic

Constants

DEFAULT_BUFFER_TYPE
DEFAULT_DATA_TYPE
DEFAULT_METRIC_FORMAT_TYPE
LOGS_DATA_TYPE
METRICS_DATA_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sumologic.rb, line 151
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source

This method is called before starting.

Calls superclass method
# File lib/fluent/plugin/out_sumologic.rb, line 160
def configure(conf)

  compat_parameters_convert(conf, :buffer)

  unless conf['endpoint'] =~ URI::regexp
    raise Fluent::ConfigError, "Invalid SumoLogic endpoint url: #{conf['endpoint']}"
  end

  unless conf['data_type'].nil?
    unless conf['data_type'] =~ /\A(?:logs|metrics)\z/
      raise Fluent::ConfigError, "Invalid data_type #{conf['data_type']} must be logs or metrics"
    end
  end

  if conf['data_type'].nil? || conf['data_type'] == LOGS_DATA_TYPE
    unless conf['log_format'].nil?
      unless conf['log_format'] =~ /\A(?:json|text|json_merge|fields)\z/
        raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json, json_merge or fields"
      end
    end
  end

  if conf['data_type'] == METRICS_DATA_TYPE && ! conf['metrics_data_type'].nil?
    unless conf['metrics_data_type'] =~ /\A(?:graphite|carbon2|pronetheus)\z/
      raise Fluent::ConfigError, "Invalid metrics_data_type #{conf['metrics_data_type']} must be graphite or carbon2 or prometheus"
    end
  end

  conf['custom_fields'] = validate_key_value_pairs(conf['custom_fields'])
  if conf['custom_fields'].nil?
    conf.delete 'custom_fields'
  end
  unless conf['custom_fields']
    @log.debug "Custom fields: #{conf['custom_fields']}"
  end

  conf['custom_dimensions'] = validate_key_value_pairs(conf['custom_dimensions'])
  if conf['custom_dimensions'].nil?
    conf.delete 'custom_dimensions'
  end
  unless conf['custom_dimensions']
    @log.debug "Custom dimensions: #{conf['custom_dimensions']}"
  end

  # For some reason default is set incorrectly in unit-tests
  if conf['sumo_client'].nil? || conf['sumo_client'].strip.length == 0
    conf['sumo_client'] = 'fluentd-output'
  end

  @sumo_conn = SumologicConnection.new(
    conf['endpoint'],
    conf['verify_ssl'],
    conf['open_timeout'].to_i,
    conf['proxy_uri'],
    conf['disable_cookies'],
    conf['sumo_client'],
    conf['compress'],
    conf['compress_encoding']
    )
  super
end
dump_log(log) click to toggle source

Strip sumo_metadata and dump to json

# File lib/fluent/plugin/out_sumologic.rb, line 249
def dump_log(log)
  log.delete('_sumo_metadata')
  begin
    parser = Yajl::Parser.new
    hash = parser.parse(log[@log_key])
    log[@log_key] = hash
    Yajl.dump(log)
  rescue
    Yajl.dump(log)
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_sumologic.rb, line 261
def format(tag, time, record)
  if defined? time.nsec
    mstime = time * 1000 + (time.nsec / 1000000)
    [mstime, record].to_msgpack
  else
    [time, record].to_msgpack
  end
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_sumologic.rb, line 270
def formatted_to_msgpack_binary
  true
end
log_to_str(log) click to toggle source

Convert log to string and strip it

# File lib/fluent/plugin/out_sumologic.rb, line 297
def log_to_str(log)
  if log.is_a?(Array) or log.is_a?(Hash)
    log = Yajl.dump(log)
  end

  unless log.nil?
    log.strip!
  end

  return log
end
merge_json(record) click to toggle source

Used to merge log record into top level json

# File lib/fluent/plugin/out_sumologic.rb, line 233
def merge_json(record)
  if record.has_key?(@log_key)
    log = record[@log_key].strip
    if log[0].eql?('{') && log[-1].eql?('}')
      begin
        record = record.merge(JSON.parse(log))
        record.delete(@log_key)
      rescue JSON::ParserError
        # do nothing, ignore
      end
    end
  end
  record
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_sumologic.rb, line 155
def multi_workers_ready?
  true
end
shutdown() click to toggle source

This method is called when shutting down.

Calls superclass method
# File lib/fluent/plugin/out_sumologic.rb, line 228
def shutdown
  super
end
start() click to toggle source

This method is called when starting.

Calls superclass method
# File lib/fluent/plugin/out_sumologic.rb, line 223
def start
  super
end
sumo_key(sumo_metadata, chunk) click to toggle source
# File lib/fluent/plugin/out_sumologic.rb, line 274
def sumo_key(sumo_metadata, chunk)
  source_name = sumo_metadata['source'] || @source_name
  source_name = extract_placeholders(source_name, chunk) unless source_name.nil?

  source_category = sumo_metadata['category'] || @source_category
  source_category = extract_placeholders(source_category, chunk) unless source_category.nil?

  source_host = sumo_metadata['host'] || @source_host
  source_host = extract_placeholders(source_host, chunk) unless source_host.nil?

  fields = sumo_metadata['fields'] || ""
  fields = extract_placeholders(fields, chunk) unless fields.nil?

  { :source_name => "#{source_name}", :source_category => "#{source_category}",
    :source_host => "#{source_host}", :fields => "#{fields}" }
end
sumo_timestamp(time) click to toggle source

Convert timestamp to 13 digit epoch if necessary

# File lib/fluent/plugin/out_sumologic.rb, line 292
def sumo_timestamp(time)
  time.to_s.length == 13 ? time : time * 1000
end
validate_key_value_pairs(fields) click to toggle source
# File lib/fluent/plugin/out_sumologic.rb, line 386
def validate_key_value_pairs(fields)
  if fields.nil?
    return fields
  end

  fields = fields.split(",").select { |field|
    field.split('=').length == 2
  }

  if fields.length == 0
    return nil
  end

  fields.join(',')
end
write(chunk) click to toggle source

This method is called every flush interval. Write the buffer chunk

# File lib/fluent/plugin/out_sumologic.rb, line 310
def write(chunk)
  messages_list = {}

  # Sort messages
  chunk.msgpack_each do |time, record|
    # plugin dies randomly
    # https://github.com/uken/fluent-plugin-elasticsearch/commit/8597b5d1faf34dd1f1523bfec45852d380b26601#diff-ae62a005780cc730c558e3e4f47cc544R94
    next unless record.is_a? Hash
    sumo_metadata = record.fetch('_sumo_metadata', {:source => record[@source_name_key] })
    key           = sumo_key(sumo_metadata, chunk)
    log_format    = sumo_metadata['log_format'] || @log_format

    # Strip any unwanted newlines
    record[@log_key].chomp! if record[@log_key] && record[@log_key].respond_to?(:chomp!)

    case @data_type
    when 'logs'
      case log_format
      when 'text'
        log = log_to_str(record[@log_key])
      when 'json_merge'
        if @add_timestamp
          record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
        end
        log = dump_log(merge_json(record))
      when 'fields'
        if @add_timestamp
          record = {  @timestamp_key => sumo_timestamp(time) }.merge(record)
        end
        log = dump_log(record)
      else
        if @add_timestamp
          record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
        end
        log = dump_log(record)
      end
    when 'metrics'
      log = log_to_str(record[@log_key])
    end

    unless log.nil?
      if messages_list.key?(key)
        messages_list[key].push(log)
      else
        messages_list[key] = [log]
      end
    end

  end

  # Push logs to sumo
  messages_list.each do |key, messages|
    source_name, source_category, source_host, fields = key[:source_name], key[:source_category],
      key[:source_host], key[:fields]

    # Merge custom and record fields
    if fields.nil? || fields.strip.length == 0
      fields = @custom_fields
    else
      fields = [fields,@custom_fields].compact.join(",")
    end

    @sumo_conn.publish(
        messages.join("\n"),
        source_host         =source_host,
        source_category     =source_category,
        source_name         =source_name,
        data_type           =@data_type,
        metric_data_format  =@metric_data_format,
        collected_fields    =fields,
        dimensions          =@custom_dimensions
    )
  end

end