class Scalyr::ScalyrOut

Public Instance Methods

build_add_events_body(chunk) click to toggle source
# File lib/fluent/plugin/out_scalyr.rb, line 302
def build_add_events_body(chunk)
  # requests
  requests = []

  # set of unique scalyr threads for this chunk
  current_threads = {}

  # byte count
  total_bytes = 0

  # create a Scalyr event object for each record in the chunk
  events = []
  chunk.msgpack_each {|(tag, sec, nsec, record)| # rubocop:disable Metrics/BlockLength
    timestamp = to_nanos(sec, nsec)

    thread_id = tag

    # then update the map of threads for this chunk
    current_threads[tag] = thread_id

    # add a logfile field if one doesn't exist
    record["logfile"] = "/fluentd/#{tag}" unless record.key? "logfile"

    # append to list of events
    event = {thread: thread_id.to_s,
             ts:     timestamp,
             attrs:  record}

    # get json string of event to keep track of how many bytes we are sending

    begin
      event_json = event.to_json
    rescue JSON::GeneratorError, Encoding::UndefinedConversionError => e
      $log.warn "JSON serialization of the event failed: #{e.class}: #{e.message}"

      # Send the faulty event to a label @ERROR block and allow to handle it there (output to exceptions file for ex)
      time = Fluent::EventTime.new(sec, nsec)
      router.emit_error_event(tag, time, record, e)

      # Print attribute values for debugging / troubleshooting purposes
      $log.debug "Event attributes:"

      event[:attrs].each do |key, value|
        # NOTE: value doesn't always value.encoding attribute so we use .class which is always available
        $log.debug "\t#{key} (#{value.class}): '#{value}'"
      end

      # Recursively re-encode and sanitize potentially bad string values
      event[:attrs] = sanitize_and_reencode_value(event[:attrs])
      event_json = event.to_json
    end

    # generate new request if json size of events in the array exceed maximum request buffer size
    append_event = true
    if total_bytes + event_json.bytesize > @max_request_buffer
      # make sure we always have at least one event
      if events.empty?
        events << event
        append_event = false
      end
      request = create_request(events, current_threads)
      requests << request

      total_bytes = 0
      current_threads = {}
      events = []
    end

    # if we haven't consumed the current event already
    # add it to the end of our array and keep track of the json bytesize
    if append_event
      events << event
      total_bytes += event_json.bytesize
    end
  }

  # create a final request with any left over events
  request = create_request(events, current_threads)
  requests << request
end
compat_parameters_default_chunk_key() click to toggle source

support for version 0.14.0:

# File lib/fluent/plugin/out_scalyr.rb, line 62
def compat_parameters_default_chunk_key
  ""
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_scalyr.rb, line 74
def configure(conf)
  if conf.elements("buffer").empty?
    $log.warn "Pre 0.14.0 configuration file detected.  Please consider updating your configuration file" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
  end

  compat_parameters_buffer(conf, default_chunk_key: "")

  super

  if @buffer.chunk_limit_size > 6_000_000
    $log.warn "Buffer chunk size is greater than 6Mb.  This may result in requests being rejected by Scalyr" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
  end

  if @max_request_buffer > 6_000_000
    $log.warn "Maximum request buffer > 6Mb.  This may result in requests being rejected by Scalyr" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
  end

  @message_encoding = nil
  if @force_message_encoding.to_s != ""
    begin
      @message_encoding = Encoding.find(@force_message_encoding)
      $log.debug "Forcing message encoding to '#{@force_message_encoding}'"
    rescue ArgumentError
      $log.warn "No encoding '#{@force_message_encoding}' found.  Ignoring"
    end
  end

  # evaluate any statements in string value of the server_attributes object
  if @server_attributes
    new_attributes = {}
    @server_attributes.each do |key, value|
      next unless value.is_a?(String)

      m = /^\#{(.*)}$/.match(value)
      new_attributes[key] = if m
                              eval(m[1]) # rubocop:disable Security/Eval
                            else
                              value
                            end
    end
    @server_attributes = new_attributes
  end

  # See if we should use the hostname as the server_attributes.serverHost
  if @use_hostname_for_serverhost

    # ensure server_attributes is not nil
    @server_attributes = {} if @server_attributes.nil?

    # only set serverHost if it doesn't currently exist in server_attributes
    # Note: Use strings rather than symbols for the key, because keys coming
    # from the config file will be strings
    unless @server_attributes.key? "serverHost"
      @server_attributes["serverHost"] = Socket.gethostname
    end
  end

  @scalyr_server << "/" unless @scalyr_server.end_with?("/")

  @add_events_uri = URI @scalyr_server + "addEvents"

  num_threads = @buffer_config.flush_thread_count

  # forcibly limit the number of threads to 1 for now, to ensure requests always have incrementing timestamps
  if num_threads > 1
    raise Fluent::ConfigError, "num_threads is currently limited to 1. You specified #{num_threads}."
  end
end
create_request(events, current_threads) click to toggle source
# File lib/fluent/plugin/out_scalyr.rb, line 383
def create_request(events, current_threads)
  # build the scalyr thread objects
  threads = []
  current_threads.each do |tag, id|
    threads << {id:   id.to_s,
                name: "Fluentd: #{tag}"}
  end

  current_time = to_millis(Fluent::Engine.now)

  body = {token:            @api_write_token,
          client_timestamp: current_time.to_s,
          session:          @session,
          events:           events,
          threads:          threads}

  # add server_attributes hash if it exists
  body[:sessionInfo] = @server_attributes if @server_attributes

  {body: body.to_json, record_count: events.size}
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_scalyr.rb, line 151
def format(tag, time, record)
  time = Fluent::Engine.now if time.nil?

  # handle timestamps that are not EventTime types
  if time.is_a?(Integer)
    time = Fluent::EventTime.new(time)
  elsif time.is_a?(Float)
    components = time.divmod 1 # get integer and decimal components
    sec = components[0].to_i
    nsec = (components[1] * 10**9).to_i
    time = Fluent::EventTime.new(sec, nsec)
  end

  if @message_field != "message"
    if record.key? @message_field
      if record.key? "message"
        $log.warn "Overwriting log record field 'message'.  You are seeing this warning because in your fluentd config file you have configured the '#{@message_field}' field to be converted to the 'message' field, but the log record already contains a field called 'message' and this is now being overwritten." # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
      end
      record["message"] = record[@message_field]
      record.delete(@message_field)
    end
  end

  if @message_encoding && record.key?("message") && record["message"]
    if @replace_invalid_utf8 && (@message_encoding == Encoding::UTF_8)
      record["message"] = record["message"].encode("UTF-8", invalid: :replace, undef: :replace, replace: "<?>").force_encoding("UTF-8") # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
    else
      record["message"].force_encoding(@message_encoding)
    end
  end
  [tag, time.sec, time.nsec, record].to_msgpack
rescue JSON::GeneratorError
  $log.warn "Unable to format message due to JSON::GeneratorError.  Record is:\n\t#{record}"
  raise
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_scalyr.rb, line 66
def formatted_to_msgpack_binary
  true
end
handle_response(response) click to toggle source
# File lib/fluent/plugin/out_scalyr.rb, line 264
def handle_response(response)
  $log.debug "Response Code: #{response.code}"
  $log.debug "Response Body: #{response.body}"

  response_hash = {}

  begin
    response_hash = JSON.parse(response.body)
  rescue StandardError
    response_hash["status"] = "Invalid JSON response from server"
  end

  # make sure the JSON reponse has a "status" field
  unless response_hash.key? "status"
    $log.debug "JSON response does not contain status message"
    raise Scalyr::ServerError.new "JSON response does not contain status message"
  end

  status = response_hash["status"]

  # 4xx codes are handled separately
  if response.code =~ /^4\d\d/
    raise Scalyr::Client4xxError.new status
  else
    if status != "success" # rubocop:disable Style/IfInsideElse
      if status =~ /discardBuffer/
        $log.warn "Received 'discardBuffer' message from server.  Buffer dropped."
      elsif status =~ %r{/client/}i
        raise Scalyr::ClientError.new status
      else # don't check specifically for server, we assume all non-client errors are server errors
        raise Scalyr::ServerError.new status
      end
    elsif !response.code.include? "200" # response code is a string not an int
      raise Scalyr::ServerError
    end
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_scalyr.rb, line 70
def multi_workers_ready?
  true
end
post_request(uri, body) click to toggle source
# File lib/fluent/plugin/out_scalyr.rb, line 225
def post_request(uri, body)
  https = Net::HTTP.new(uri.host, uri.port)
  https.use_ssl = true

  # verify peers to prevent potential MITM attacks
  if @ssl_verify_peer
    https.ca_file = @ssl_ca_bundle_path unless @ssl_ca_bundle_path.nil?
    https.ssl_version = :TLSv1_2
    https.verify_mode = OpenSSL::SSL::VERIFY_PEER
    https.verify_depth = @ssl_verify_depth
  end

  # use compression if enabled
  encoding = nil

  if @compression_type
    if @compression_type == "deflate"
      encoding = "deflate"
      body = Zlib::Deflate.deflate(body, @compression_level)
    elsif @compression_type == "bz2"
      encoding = "bz2"
      io = StringIO.new
      bz2 = RBzip2.default_adapter::Compressor.new io
      bz2.write body
      bz2.close
      body = io.string
    end
  end

  post = Net::HTTP::Post.new uri.path
  post.add_field("Content-Type", "application/json")

  post.add_field("Content-Encoding", encoding) if @compression_type

  post.body = body

  https.request(post)
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_scalyr.rb, line 143
def start
  super
  # Generate a session id.  This will be called once for each <match> in fluent.conf that uses scalyr
  @session = SecureRandom.uuid

  $log.info "Scalyr Fluentd Plugin ID id=#{plugin_id} worker=#{fluentd_worker_id} session=#{@session}" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
end
to_millis(timestamp) click to toggle source

explicit function to convert to milliseconds will make things easier to maintain if/when fluentd supports higher than second resolutions

# File lib/fluent/plugin/out_scalyr.rb, line 221
def to_millis(timestamp)
  (timestamp.sec * 10**3) + (timestamp.nsec / 10**6)
end
to_nanos(seconds, nsec) click to toggle source

explicit function to convert to nanoseconds will make things easier to maintain if/when fluentd supports higher than second resolutions

# File lib/fluent/plugin/out_scalyr.rb, line 215
def to_nanos(seconds, nsec)
  (seconds * 10**9) + nsec
end
write(chunk) click to toggle source

called by fluentd when a chunk of log messages is ready

# File lib/fluent/plugin/out_scalyr.rb, line 188
def write(chunk)
  $log.debug "Size of chunk is: #{chunk.size}"
  requests = build_add_events_body(chunk)
  $log.debug "Chunk split into #{requests.size} request(s)."

  requests.each_with_index {|request, index|
    $log.debug "Request #{index + 1}/#{requests.size}: #{request[:body].bytesize} bytes"
    begin
      response = post_request(@add_events_uri, request[:body])
      handle_response(response)
    rescue OpenSSL::SSL::SSLError => e
      if e.message.include? "certificate verify failed"
        $log.warn "SSL certificate verification failed.  Please make sure your certificate bundle is configured correctly and points to a valid file. You can configure this with the ssl_ca_bundle_path configuration option. The current value of ssl_ca_bundle_path is '#{@ssl_ca_bundle_path}'" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
      end
      $log.warn e.message
      $log.warn "Discarding buffer chunk without retrying or logging to <secondary>"
    rescue Scalyr::Client4xxError => e
      $log.warn "4XX status code received for request #{index + 1}/#{requests.size}.  Discarding buffer without retrying or logging.\n\t#{response.code} - #{e.message}\n\tChunk Size: #{chunk.size}\n\tLog messages this request: #{request[:record_count]}\n\tJSON payload size: #{request[:body].bytesize}\n\tSample: #{request[:body][0, 1024]}..."
    end
  }
rescue JSON::GeneratorError
  $log.warn "Unable to format message due to JSON::GeneratorError."
  raise
end