class Fluent::Plugin::NewrelicOutput
Constants
- DEFAULT_BUFFER_TYPE
- DEFAULT_TIMEKEY
- DEFAULT_TIMEKEY_WAIT
- MAX_PAYLOAD_SIZE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_newrelic.rb, line 55 def configure(conf) super @api_key ||= ENV["NEW_RELIC_API_KEY"] @license_key ||= ENV["NEW_RELIC_LICENSE_KEY"] if @api_key.nil? && @license_key.nil? raise Fluent::ConfigError.new("'api_key' or 'license_key' parameter is required") end # create initial sockets hash and socket based on config param @end_point = URI.parse(@base_uri) auth = { @api_key.nil? ? 'X-License-Key' : 'X-Insert-Key' => @api_key.nil? ? @license_key : @api_key } @header = { 'X-Event-Source' => 'logs', 'Content-Encoding' => 'gzip' }.merge(auth) .freeze end
handle_response(response)
click to toggle source
# File lib/fluent/plugin/out_newrelic.rb, line 117 def handle_response(response) if !(200 <= response.code.to_i && response.code.to_i < 300) log.error("Response was " + response.code + " " + response.body) end end
multi_workers_ready?()
click to toggle source
This tells Fluentd that it can run this output plugin in multiple workers. Our plugin has no interactions with other processes
# File lib/fluent/plugin/out_newrelic.rb, line 51 def multi_workers_ready? true end
package_record(record, timestamp)
click to toggle source
# File lib/fluent/plugin/out_newrelic.rb, line 77 def package_record(record, timestamp) if defined? timestamp.nsec timestamp = timestamp * 1000 + timestamp.nsec / 1_000_000 end packaged = { 'timestamp' => timestamp, # non-intrinsic attributes get put into 'attributes' 'attributes' => record } # intrinsic attributes go at the top level if record.has_key?('message') packaged['message'] = record['message'] packaged['attributes'].delete('message') end # Kubernetes logging puts the message field in the 'log' attribute, we'll use that # as the 'message' field if it exists. We do the same in the Fluent Bit output plugin. # See https://docs.docker.com/config/containers/logging/fluentd/ if record.has_key?('log') packaged['message'] = record['log'] packaged['attributes'].delete('log') end packaged end
send_payload(payload)
click to toggle source
# File lib/fluent/plugin/out_newrelic.rb, line 123 def send_payload(payload) http = Net::HTTP.new(@end_point.host, 443) http.use_ssl = true http.verify_mode = OpenSSL::SSL::VERIFY_PEER request = Net::HTTP::Post.new(@end_point.request_uri, @header) request.body = payload handle_response(http.request(request)) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_newrelic.rb, line 104 def write(chunk) logs = [] chunk.msgpack_each do |ts, record| next unless record.is_a? Hash next if record.empty? logs.push(package_record(record, ts)) end payloads = get_compressed_payloads(logs) payloads.each { |payload| send_payload(payload) } end
Private Instance Methods
compress(payload)
click to toggle source
# File lib/fluent/plugin/out_newrelic.rb, line 172 def compress(payload) io = StringIO.new gzip = Zlib::GzipWriter.new(io) # Fluentd can run with a version of Ruby (2.1.0) whose to_json method doesn't support non-ASCII characters. # So we use Yajl, which can handle all Unicode characters. Apparently this library is what Fluentd uses # internally, so it is installed by default with td-agent. # See https://github.com/fluent/fluentd/issues/215 gzip << Yajl.dump([payload]) gzip.close io.string end
create_payload(logs)
click to toggle source
# File lib/fluent/plugin/out_newrelic.rb, line 158 def create_payload(logs) { 'common' => { 'attributes' => { 'plugin' => { 'type' => 'fluentd', 'version' => NewrelicFluentdOutput::VERSION, } } }, 'logs' => logs } end
get_compressed_payloads(logs)
click to toggle source
# File lib/fluent/plugin/out_newrelic.rb, line 134 def get_compressed_payloads(logs) return [] if logs.length == 0 payload = create_payload(logs) compressed_payload = compress(payload) if compressed_payload.bytesize <= MAX_PAYLOAD_SIZE return [compressed_payload] end compressed_payload = nil # Free for GC if logs.length > 1 # we can split # let's split logs array by half, and try to create payloads again midpoint = logs.length / 2 first_half = get_compressed_payloads(logs.slice(0, midpoint)) second_half = get_compressed_payloads(logs.slice(midpoint, logs.length)) return first_half + second_half else log.error("Can't compress record below required maximum packet size and it will be discarded. Record: #{logs[0]}") return [] end end