class ScalyrThreaded::ScalyrOut

Public Instance Methods

build_add_events_body( chunk ) click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 320
def build_add_events_body( chunk )

  #requests
  requests = Array.new

  #set of unique scalyr threads for this chunk
  current_threads = Hash.new

  #byte count
  total_bytes = 0

  #create a Scalyr event object for each record in the chunk
  events = Array.new
  chunk.msgpack_each {|(tag, sec, nsec, record)|

    timestamp = self.to_nanos( sec, nsec )

    thread_id = 0

    @sync.synchronize {
      #ensure timestamp is at least 1 nanosecond greater than the last one
      timestamp = [timestamp, @last_timestamp + 1].max
      @last_timestamp = timestamp

      #get thread id or add a new one if we haven't seen this tag before
      if @thread_ids.key? tag
        thread_id = @thread_ids[tag]
      else
        thread_id = @next_id
        @thread_ids[tag] = thread_id
        @next_id += 1
      end
    }

    #then update the map of threads for this chunk
    current_threads[tag] = thread_id

    #add a logfile field if one doesn't exist
    if !record.key? "logfile"
      record["logfile"] = "/fluentd/#{tag}"
    end

    #append to list of events
    event = { :thread => thread_id.to_s,
              :ts => timestamp,
              :attrs => record
            }

    #get json string of event to keep track of how many bytes we are sending

    begin
      event_json = event.to_json
    rescue JSON::GeneratorError, Encoding::UndefinedConversionError => e
      $log.warn "#{e.class}: #{e.message}"

      # Send the faulty event to a label @ERROR block and allow to handle it there (output to exceptions file for ex)
      time = Fluent::EventTime.new( sec, nsec )
      router.emit_error_event(tag, time, record, e)

      event[:attrs].each do |key, value|
        $log.debug "\t#{key} (#{value.encoding.name}): '#{value}'"
        event[:attrs][key] = value.encode("UTF-8", :invalid => :replace, :undef => :replace, :replace => "<?>").force_encoding('UTF-8')
      end
      event_json = event.to_json
    end

    #generate new request if json size of events in the array exceed maximum request buffer size
    append_event = true
    if total_bytes + event_json.bytesize > @max_request_buffer
      #make sure we always have at least one event
      if events.size == 0
        events << event
        append_event = false
      end
      request = self.create_request( events, current_threads )
      requests << request

      total_bytes = 0
      current_threads = Hash.new
      events = Array.new
    end

    #if we haven't consumed the current event already
    #add it to the end of our array and keep track of the json bytesize
    if append_event
      events << event
      total_bytes += event_json.bytesize
    end

  }

  #create a final request with any left over events
  request = self.create_request( events, current_threads )
  requests << request

end
compat_parameters_default_chunk_key() click to toggle source

support for version 0.14.0:

# File lib/fluent/plugin/out_scalyr_threaded.rb, line 62
def compat_parameters_default_chunk_key
  ""
end
configure( conf ) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 70
def configure( conf )

  if conf.elements('buffer').empty?
    $log.warn "Pre 0.14.0 configuration file detected.  Please consider updating your configuration file"
  end

  compat_parameters_buffer( conf, default_chunk_key: '' )

  super

  if @buffer.chunk_limit_size > 1024*1024
    $log.warn "Buffer chunk size is greater than 1Mb.  This may result in requests being rejected by Scalyr"
  end

  if @max_request_buffer > (1024*1024*3)
    $log.warn "Maximum request buffer > 3Mb.  This may result in requests being rejected by Scalyr"
  end

  @message_encoding = nil
  if @force_message_encoding.to_s != ''
    begin
      @message_encoding = Encoding.find( @force_message_encoding )
      $log.debug "Forcing message encoding to '#{@force_message_encoding}'"
    rescue ArgumentError
      $log.warn "No encoding '#{@force_message_encoding}' found.  Ignoring"
    end
  end

  #evaluate any statements in string value of the server_attributes object
  if @server_attributes
    new_attributes = {}
    @server_attributes.each do |key, value|
      if value.is_a?( String )
        m = /^\#{(.*)}$/.match( value )
        if m
          new_attributes[key] = eval( m[1] )
        else
          new_attributes[key] = value
        end
      end
    end
    @server_attributes = new_attributes
  end

  # See if we should use the hostname as the server_attributes.serverHost
  if @use_hostname_for_serverhost

    # ensure server_attributes is not nil
    if @server_attributes.nil?
      @server_attributes = {}
    end

    # only set serverHost if it doesn't currently exist in server_attributes
    # Note: Use strings rather than symbols for the key, because keys coming
    # from the config file will be strings
    if !@server_attributes.key? 'serverHost'
      @server_attributes['serverHost'] = Socket.gethostname
    end
  end

  @scalyr_server << '/' unless @scalyr_server.end_with?('/')

  @add_events_uri = URI @scalyr_server + "addEvents"

  num_threads = @buffer_config.flush_thread_count

  #forcibly limit the number of threads to 1 for now, to ensure requests always have incrementing timestamps
  # raise Fluent::ConfigError, "num_threads is currently limited to 1. You specified #{num_threads}." if num_threads > 1
end
create_request( events, current_threads ) click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 417
def create_request( events, current_threads )
  #build the scalyr thread objects
  threads = Array.new
  current_threads.each do |tag, id|
    threads << { :id => id.to_s,
                 :name => "Fluentd: #{tag}"
               }
  end

  current_time = self.to_millis( Fluent::Engine.now )

  body = { :token => @api_write_token,
              :client_timestamp => current_time.to_s,
              :session => @session,
              :events => events,
              :threads => threads
            }

  #add server_attributes hash if it exists
  if @server_attributes
    body[:sessionInfo] = @server_attributes
  end

  { :body => body.to_json, :record_count => events.size }
end
format( tag, time, record ) click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 154
def format( tag, time, record )
  begin

    if time.nil?
      time = Fluent::Engine.now
    end

    # handle timestamps that are not EventTime types
    if time.is_a?( Integer )
      time = Fluent::EventTime.new( time )
    elsif time.is_a?( Float )
      components = time.divmod 1 #get integer and decimal components
      sec = components[0].to_i
      nsec = (components[1] * 10**9).to_i
      time = Fluent::EventTime.new( sec, nsec )
    end

    if @message_field != "message"
      if record.key? @message_field
        if record.key? "message"
          $log.warn "Overwriting log record field 'message'.  You are seeing this warning because in your fluentd config file you have configured the '#{@message_field}' field to be converted to the 'message' field, but the log record already contains a field called 'message' and this is now being overwritten."
        end
        record["message"] = record[@message_field]
        record.delete( @message_field )
      end
    end

    if @message_encoding and record.key? "message" and record["message"]
      if @replace_invalid_utf8 and @message_encoding == Encoding::UTF_8
        record["message"] = record["message"].encode("UTF-8", :invalid => :replace, :undef => :replace, :replace => "<?>").force_encoding('UTF-8')
      else
        record["message"].force_encoding( @message_encoding )
      end
    end
    [tag, time.sec, time.nsec, record].to_msgpack

  rescue JSON::GeneratorError
    $log.warn "Unable to format message due to JSON::GeneratorError.  Record is:\n\t#{record.to_s}"
    raise
  end
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 66
def formatted_to_msgpack_binary
  true
end
handle_response( response ) click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 281
def handle_response( response )
  $log.debug "Response Code: #{response.code}"
  $log.debug "Response Body: #{response.body}"

  response_hash = Hash.new

  begin
    response_hash = JSON.parse( response.body )
  rescue
    response_hash["status"] = "Invalid JSON response from server"
  end

  #make sure the JSON reponse has a "status" field
  if !response_hash.key? "status"
    $log.debug "JSON response does not contain status message"
    raise ScalyrThreaded::ServerError.new "JSON response does not contain status message"
  end

  status = response_hash["status"]

  #4xx codes are handled separately
  if response.code =~ /^4\d\d/
    raise ScalyrThreaded::Client4xxError.new status
  else
    if status != "success"
      if status =~ /discardBuffer/
        $log.warn "Received 'discardBuffer' message from server.  Buffer dropped."
      elsif status =~ %r"/client/"i
        raise ScalyrThreaded::ClientError.new status
      else #don't check specifically for server, we assume all non-client errors are server errors
        raise ScalyrThreaded::ServerError.new status
      end
    elsif !response.code.include? "200" #response code is a string not an int
      raise ScalyrThreaded::ServerError
    end
  end

end
post_request( uri, body ) click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 239
def post_request( uri, body )

  https = Net::HTTP.new( uri.host, uri.port )
  https.use_ssl = true

  #verify peers to prevent potential MITM attacks
  if @ssl_verify_peer
    https.ca_file = @ssl_ca_bundle_path
    https.verify_mode = OpenSSL::SSL::VERIFY_PEER
    https.verify_depth = @ssl_verify_depth
  end

  #use compression if enabled
  encoding = nil

  if @compression_type
    if @compression_type == 'deflate'
      encoding = 'deflate'
      body = Zlib::Deflate.deflate(body, @compression_level)
    elsif @compression_type == 'bz2'
      encoding = 'bz2'
      io = StringIO.new
      bz2 = RBzip2.default_adapter::Compressor.new io
      bz2.write body
      bz2.close
      body = io.string
    end
  end

  post = Net::HTTP::Post.new uri.path
  post.add_field( 'Content-Type', 'application/json' )

  if @compression_type
    post.add_field( 'Content-Encoding', encoding )
  end

  post.body = body

  https.request( post )

end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 140
def start
  super
  $log.info "Scalyr Threaded Fluentd Plugin ID - #{self.plugin_id()}"
  #Generate a session id.  This will be called once for each <match> in fluent.conf that uses scalyr
  @session = SecureRandom.uuid

  @sync = Mutex.new
  #the following variables are all under the control of the above mutex
    @thread_ids = Hash.new #hash of tags -> id
    @next_id = 1 #incrementing thread id for the session
    @last_timestamp = 0 #timestamp of most recent event in nanoseconds since epoch

end
to_millis( timestamp ) click to toggle source

explicit function to convert to milliseconds will make things easier to maintain if/when fluentd supports higher than second resolutions

# File lib/fluent/plugin/out_scalyr_threaded.rb, line 235
def to_millis( timestamp )
  (timestamp.sec * 10**3) + (timestamp.nsec / 10**6)
end
to_nanos( seconds, nsec ) click to toggle source

explicit function to convert to nanoseconds will make things easier to maintain if/when fluentd supports higher than second resolutions

# File lib/fluent/plugin/out_scalyr_threaded.rb, line 229
def to_nanos( seconds, nsec )
  (seconds * 10**9) + nsec
end
write( chunk ) click to toggle source

called by fluentd when a chunk of log messages is ready

# File lib/fluent/plugin/out_scalyr_threaded.rb, line 197
def write( chunk )
  begin
    $log.debug "Size of chunk is: #{chunk.size}"
    requests = self.build_add_events_body( chunk )
    $log.debug "Chunk split into #{requests.size} request(s)."

    requests.each_with_index { |request, index|
      $log.debug "Request #{index + 1}/#{requests.size}: #{request[:body].bytesize} bytes"
      begin
        response = self.post_request( @add_events_uri, request[:body] )
        self.handle_response( response )
      rescue OpenSSL::SSL::SSLError => e
        if e.message.include? "certificate verify failed"
          $log.warn "SSL certificate verification failed.  Please make sure your certificate bundle is configured correctly and points to a valid file. You can configure this with the ssl_ca_bundle_path configuration option. The current value of ssl_ca_bundle_path is '#{@ssl_ca_bundle_path}'"
        end
        $log.warn e.message
        $log.warn "Discarding buffer chunk without retrying or logging to <secondary>"
      rescue ScalyrThreaded::Client4xxError => e
        $log.warn "4XX status code received for request #{index + 1}/#{requests.size}.  Discarding buffer without retrying or logging.\n\t#{response.code} - #{e.message}\n\tChunk Size: #{chunk.size}\n\tLog messages this request: #{request[:record_count]}\n\tJSON payload size: #{request[:body].bytesize}\n\tSample: #{request[:body][0,1024]}..."

      end
    }

  rescue JSON::GeneratorError
    $log.warn "Unable to format message due to JSON::GeneratorError."
    raise
  end
end