class Fluent::Plugin::Sumologic
Constants
- DEFAULT_BUFFER_TYPE
- DEFAULT_DATA_TYPE
- DEFAULT_METRIC_FORMAT_TYPE
- LOGS_DATA_TYPE
- METRICS_DATA_TYPE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sumologic.rb, line 151 def initialize super end
Public Instance Methods
configure(conf)
click to toggle source
This method is called before starting.
Calls superclass method
# File lib/fluent/plugin/out_sumologic.rb, line 160 def configure(conf) compat_parameters_convert(conf, :buffer) unless conf['endpoint'] =~ URI::regexp raise Fluent::ConfigError, "Invalid SumoLogic endpoint url: #{conf['endpoint']}" end unless conf['data_type'].nil? unless conf['data_type'] =~ /\A(?:logs|metrics)\z/ raise Fluent::ConfigError, "Invalid data_type #{conf['data_type']} must be logs or metrics" end end if conf['data_type'].nil? || conf['data_type'] == LOGS_DATA_TYPE unless conf['log_format'].nil? unless conf['log_format'] =~ /\A(?:json|text|json_merge|fields)\z/ raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json, json_merge or fields" end end end if conf['data_type'] == METRICS_DATA_TYPE && ! conf['metrics_data_type'].nil? unless conf['metrics_data_type'] =~ /\A(?:graphite|carbon2|pronetheus)\z/ raise Fluent::ConfigError, "Invalid metrics_data_type #{conf['metrics_data_type']} must be graphite or carbon2 or prometheus" end end conf['custom_fields'] = validate_key_value_pairs(conf['custom_fields']) if conf['custom_fields'].nil? conf.delete 'custom_fields' end unless conf['custom_fields'] @log.debug "Custom fields: #{conf['custom_fields']}" end conf['custom_dimensions'] = validate_key_value_pairs(conf['custom_dimensions']) if conf['custom_dimensions'].nil? conf.delete 'custom_dimensions' end unless conf['custom_dimensions'] @log.debug "Custom dimensions: #{conf['custom_dimensions']}" end # For some reason default is set incorrectly in unit-tests if conf['sumo_client'].nil? || conf['sumo_client'].strip.length == 0 conf['sumo_client'] = 'fluentd-output' end @sumo_conn = SumologicConnection.new( conf['endpoint'], conf['verify_ssl'], conf['open_timeout'].to_i, conf['proxy_uri'], conf['disable_cookies'], conf['sumo_client'], conf['compress'], conf['compress_encoding'] ) super end
dump_log(log)
click to toggle source
Strip sumo_metadata and dump to json
# File lib/fluent/plugin/out_sumologic.rb, line 249 def dump_log(log) log.delete('_sumo_metadata') begin parser = Yajl::Parser.new hash = parser.parse(log[@log_key]) log[@log_key] = hash Yajl.dump(log) rescue Yajl.dump(log) end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_sumologic.rb, line 261 def format(tag, time, record) if defined? time.nsec mstime = time * 1000 + (time.nsec / 1000000) [mstime, record].to_msgpack else [time, record].to_msgpack end end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_sumologic.rb, line 270 def formatted_to_msgpack_binary true end
log_to_str(log)
click to toggle source
Convert log to string and strip it
# File lib/fluent/plugin/out_sumologic.rb, line 297 def log_to_str(log) if log.is_a?(Array) or log.is_a?(Hash) log = Yajl.dump(log) end unless log.nil? log.strip! end return log end
merge_json(record)
click to toggle source
Used to merge log record into top level json
# File lib/fluent/plugin/out_sumologic.rb, line 233 def merge_json(record) if record.has_key?(@log_key) log = record[@log_key].strip if log[0].eql?('{') && log[-1].eql?('}') begin record = record.merge(JSON.parse(log)) record.delete(@log_key) rescue JSON::ParserError # do nothing, ignore end end end record end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_sumologic.rb, line 155 def multi_workers_ready? true end
shutdown()
click to toggle source
This method is called when shutting down.
Calls superclass method
# File lib/fluent/plugin/out_sumologic.rb, line 228 def shutdown super end
start()
click to toggle source
This method is called when starting.
Calls superclass method
# File lib/fluent/plugin/out_sumologic.rb, line 223 def start super end
sumo_key(sumo_metadata, chunk)
click to toggle source
# File lib/fluent/plugin/out_sumologic.rb, line 274 def sumo_key(sumo_metadata, chunk) source_name = sumo_metadata['source'] || @source_name source_name = extract_placeholders(source_name, chunk) unless source_name.nil? source_category = sumo_metadata['category'] || @source_category source_category = extract_placeholders(source_category, chunk) unless source_category.nil? source_host = sumo_metadata['host'] || @source_host source_host = extract_placeholders(source_host, chunk) unless source_host.nil? fields = sumo_metadata['fields'] || "" fields = extract_placeholders(fields, chunk) unless fields.nil? { :source_name => "#{source_name}", :source_category => "#{source_category}", :source_host => "#{source_host}", :fields => "#{fields}" } end
sumo_timestamp(time)
click to toggle source
Convert timestamp to 13 digit epoch if necessary
# File lib/fluent/plugin/out_sumologic.rb, line 292 def sumo_timestamp(time) time.to_s.length == 13 ? time : time * 1000 end
validate_key_value_pairs(fields)
click to toggle source
# File lib/fluent/plugin/out_sumologic.rb, line 386 def validate_key_value_pairs(fields) if fields.nil? return fields end fields = fields.split(",").select { |field| field.split('=').length == 2 } if fields.length == 0 return nil end fields.join(',') end
write(chunk)
click to toggle source
This method is called every flush interval. Write the buffer chunk
# File lib/fluent/plugin/out_sumologic.rb, line 310 def write(chunk) messages_list = {} # Sort messages chunk.msgpack_each do |time, record| # plugin dies randomly # https://github.com/uken/fluent-plugin-elasticsearch/commit/8597b5d1faf34dd1f1523bfec45852d380b26601#diff-ae62a005780cc730c558e3e4f47cc544R94 next unless record.is_a? Hash sumo_metadata = record.fetch('_sumo_metadata', {:source => record[@source_name_key] }) key = sumo_key(sumo_metadata, chunk) log_format = sumo_metadata['log_format'] || @log_format # Strip any unwanted newlines record[@log_key].chomp! if record[@log_key] && record[@log_key].respond_to?(:chomp!) case @data_type when 'logs' case log_format when 'text' log = log_to_str(record[@log_key]) when 'json_merge' if @add_timestamp record = { @timestamp_key => sumo_timestamp(time) }.merge(record) end log = dump_log(merge_json(record)) when 'fields' if @add_timestamp record = { @timestamp_key => sumo_timestamp(time) }.merge(record) end log = dump_log(record) else if @add_timestamp record = { @timestamp_key => sumo_timestamp(time) }.merge(record) end log = dump_log(record) end when 'metrics' log = log_to_str(record[@log_key]) end unless log.nil? if messages_list.key?(key) messages_list[key].push(log) else messages_list[key] = [log] end end end # Push logs to sumo messages_list.each do |key, messages| source_name, source_category, source_host, fields = key[:source_name], key[:source_category], key[:source_host], key[:fields] # Merge custom and record fields if fields.nil? || fields.strip.length == 0 fields = @custom_fields else fields = [fields,@custom_fields].compact.join(",") end @sumo_conn.publish( messages.join("\n"), source_host =source_host, source_category =source_category, source_name =source_name, data_type =@data_type, metric_data_format =@metric_data_format, collected_fields =fields, dimensions =@custom_dimensions ) end end