class ScalyrThreaded::ScalyrOut
Public Instance Methods
build_add_events_body( chunk )
click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 320 def build_add_events_body( chunk ) #requests requests = Array.new #set of unique scalyr threads for this chunk current_threads = Hash.new #byte count total_bytes = 0 #create a Scalyr event object for each record in the chunk events = Array.new chunk.msgpack_each {|(tag, sec, nsec, record)| timestamp = self.to_nanos( sec, nsec ) thread_id = 0 @sync.synchronize { #ensure timestamp is at least 1 nanosecond greater than the last one timestamp = [timestamp, @last_timestamp + 1].max @last_timestamp = timestamp #get thread id or add a new one if we haven't seen this tag before if @thread_ids.key? tag thread_id = @thread_ids[tag] else thread_id = @next_id @thread_ids[tag] = thread_id @next_id += 1 end } #then update the map of threads for this chunk current_threads[tag] = thread_id #add a logfile field if one doesn't exist if !record.key? "logfile" record["logfile"] = "/fluentd/#{tag}" end #append to list of events event = { :thread => thread_id.to_s, :ts => timestamp, :attrs => record } #get json string of event to keep track of how many bytes we are sending begin event_json = event.to_json rescue JSON::GeneratorError, Encoding::UndefinedConversionError => e $log.warn "#{e.class}: #{e.message}" # Send the faulty event to a label @ERROR block and allow to handle it there (output to exceptions file for ex) time = Fluent::EventTime.new( sec, nsec ) router.emit_error_event(tag, time, record, e) event[:attrs].each do |key, value| $log.debug "\t#{key} (#{value.encoding.name}): '#{value}'" event[:attrs][key] = value.encode("UTF-8", :invalid => :replace, :undef => :replace, :replace => "<?>").force_encoding('UTF-8') end event_json = event.to_json end #generate new request if json size of events in the array exceed maximum request buffer size append_event = true if total_bytes + event_json.bytesize > @max_request_buffer #make sure we always have at least one event if events.size == 0 events << event append_event = false end request = self.create_request( events, current_threads ) requests << request total_bytes = 0 current_threads = Hash.new events = Array.new end #if we haven't consumed the current event already #add it to the end of our array and keep track of the json bytesize if append_event events << event total_bytes += event_json.bytesize end } #create a final request with any left over events request = self.create_request( events, current_threads ) requests << request end
compat_parameters_default_chunk_key()
click to toggle source
support for version 0.14.0:
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 62 def compat_parameters_default_chunk_key "" end
configure( conf )
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 70 def configure( conf ) if conf.elements('buffer').empty? $log.warn "Pre 0.14.0 configuration file detected. Please consider updating your configuration file" end compat_parameters_buffer( conf, default_chunk_key: '' ) super if @buffer.chunk_limit_size > 1024*1024 $log.warn "Buffer chunk size is greater than 1Mb. This may result in requests being rejected by Scalyr" end if @max_request_buffer > (1024*1024*3) $log.warn "Maximum request buffer > 3Mb. This may result in requests being rejected by Scalyr" end @message_encoding = nil if @force_message_encoding.to_s != '' begin @message_encoding = Encoding.find( @force_message_encoding ) $log.debug "Forcing message encoding to '#{@force_message_encoding}'" rescue ArgumentError $log.warn "No encoding '#{@force_message_encoding}' found. Ignoring" end end #evaluate any statements in string value of the server_attributes object if @server_attributes new_attributes = {} @server_attributes.each do |key, value| if value.is_a?( String ) m = /^\#{(.*)}$/.match( value ) if m new_attributes[key] = eval( m[1] ) else new_attributes[key] = value end end end @server_attributes = new_attributes end # See if we should use the hostname as the server_attributes.serverHost if @use_hostname_for_serverhost # ensure server_attributes is not nil if @server_attributes.nil? @server_attributes = {} end # only set serverHost if it doesn't currently exist in server_attributes # Note: Use strings rather than symbols for the key, because keys coming # from the config file will be strings if !@server_attributes.key? 'serverHost' @server_attributes['serverHost'] = Socket.gethostname end end @scalyr_server << '/' unless @scalyr_server.end_with?('/') @add_events_uri = URI @scalyr_server + "addEvents" num_threads = @buffer_config.flush_thread_count #forcibly limit the number of threads to 1 for now, to ensure requests always have incrementing timestamps # raise Fluent::ConfigError, "num_threads is currently limited to 1. You specified #{num_threads}." if num_threads > 1 end
create_request( events, current_threads )
click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 417 def create_request( events, current_threads ) #build the scalyr thread objects threads = Array.new current_threads.each do |tag, id| threads << { :id => id.to_s, :name => "Fluentd: #{tag}" } end current_time = self.to_millis( Fluent::Engine.now ) body = { :token => @api_write_token, :client_timestamp => current_time.to_s, :session => @session, :events => events, :threads => threads } #add server_attributes hash if it exists if @server_attributes body[:sessionInfo] = @server_attributes end { :body => body.to_json, :record_count => events.size } end
format( tag, time, record )
click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 154 def format( tag, time, record ) begin if time.nil? time = Fluent::Engine.now end # handle timestamps that are not EventTime types if time.is_a?( Integer ) time = Fluent::EventTime.new( time ) elsif time.is_a?( Float ) components = time.divmod 1 #get integer and decimal components sec = components[0].to_i nsec = (components[1] * 10**9).to_i time = Fluent::EventTime.new( sec, nsec ) end if @message_field != "message" if record.key? @message_field if record.key? "message" $log.warn "Overwriting log record field 'message'. You are seeing this warning because in your fluentd config file you have configured the '#{@message_field}' field to be converted to the 'message' field, but the log record already contains a field called 'message' and this is now being overwritten." end record["message"] = record[@message_field] record.delete( @message_field ) end end if @message_encoding and record.key? "message" and record["message"] if @replace_invalid_utf8 and @message_encoding == Encoding::UTF_8 record["message"] = record["message"].encode("UTF-8", :invalid => :replace, :undef => :replace, :replace => "<?>").force_encoding('UTF-8') else record["message"].force_encoding( @message_encoding ) end end [tag, time.sec, time.nsec, record].to_msgpack rescue JSON::GeneratorError $log.warn "Unable to format message due to JSON::GeneratorError. Record is:\n\t#{record.to_s}" raise end end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 66 def formatted_to_msgpack_binary true end
handle_response( response )
click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 281 def handle_response( response ) $log.debug "Response Code: #{response.code}" $log.debug "Response Body: #{response.body}" response_hash = Hash.new begin response_hash = JSON.parse( response.body ) rescue response_hash["status"] = "Invalid JSON response from server" end #make sure the JSON reponse has a "status" field if !response_hash.key? "status" $log.debug "JSON response does not contain status message" raise ScalyrThreaded::ServerError.new "JSON response does not contain status message" end status = response_hash["status"] #4xx codes are handled separately if response.code =~ /^4\d\d/ raise ScalyrThreaded::Client4xxError.new status else if status != "success" if status =~ /discardBuffer/ $log.warn "Received 'discardBuffer' message from server. Buffer dropped." elsif status =~ %r"/client/"i raise ScalyrThreaded::ClientError.new status else #don't check specifically for server, we assume all non-client errors are server errors raise ScalyrThreaded::ServerError.new status end elsif !response.code.include? "200" #response code is a string not an int raise ScalyrThreaded::ServerError end end end
post_request( uri, body )
click to toggle source
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 239 def post_request( uri, body ) https = Net::HTTP.new( uri.host, uri.port ) https.use_ssl = true #verify peers to prevent potential MITM attacks if @ssl_verify_peer https.ca_file = @ssl_ca_bundle_path https.verify_mode = OpenSSL::SSL::VERIFY_PEER https.verify_depth = @ssl_verify_depth end #use compression if enabled encoding = nil if @compression_type if @compression_type == 'deflate' encoding = 'deflate' body = Zlib::Deflate.deflate(body, @compression_level) elsif @compression_type == 'bz2' encoding = 'bz2' io = StringIO.new bz2 = RBzip2.default_adapter::Compressor.new io bz2.write body bz2.close body = io.string end end post = Net::HTTP::Post.new uri.path post.add_field( 'Content-Type', 'application/json' ) if @compression_type post.add_field( 'Content-Encoding', encoding ) end post.body = body https.request( post ) end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 140 def start super $log.info "Scalyr Threaded Fluentd Plugin ID - #{self.plugin_id()}" #Generate a session id. This will be called once for each <match> in fluent.conf that uses scalyr @session = SecureRandom.uuid @sync = Mutex.new #the following variables are all under the control of the above mutex @thread_ids = Hash.new #hash of tags -> id @next_id = 1 #incrementing thread id for the session @last_timestamp = 0 #timestamp of most recent event in nanoseconds since epoch end
to_millis( timestamp )
click to toggle source
explicit function to convert to milliseconds will make things easier to maintain if/when fluentd supports higher than second resolutions
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 235 def to_millis( timestamp ) (timestamp.sec * 10**3) + (timestamp.nsec / 10**6) end
to_nanos( seconds, nsec )
click to toggle source
explicit function to convert to nanoseconds will make things easier to maintain if/when fluentd supports higher than second resolutions
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 229 def to_nanos( seconds, nsec ) (seconds * 10**9) + nsec end
write( chunk )
click to toggle source
called by fluentd when a chunk of log messages is ready
# File lib/fluent/plugin/out_scalyr_threaded.rb, line 197 def write( chunk ) begin $log.debug "Size of chunk is: #{chunk.size}" requests = self.build_add_events_body( chunk ) $log.debug "Chunk split into #{requests.size} request(s)." requests.each_with_index { |request, index| $log.debug "Request #{index + 1}/#{requests.size}: #{request[:body].bytesize} bytes" begin response = self.post_request( @add_events_uri, request[:body] ) self.handle_response( response ) rescue OpenSSL::SSL::SSLError => e if e.message.include? "certificate verify failed" $log.warn "SSL certificate verification failed. Please make sure your certificate bundle is configured correctly and points to a valid file. You can configure this with the ssl_ca_bundle_path configuration option. The current value of ssl_ca_bundle_path is '#{@ssl_ca_bundle_path}'" end $log.warn e.message $log.warn "Discarding buffer chunk without retrying or logging to <secondary>" rescue ScalyrThreaded::Client4xxError => e $log.warn "4XX status code received for request #{index + 1}/#{requests.size}. Discarding buffer without retrying or logging.\n\t#{response.code} - #{e.message}\n\tChunk Size: #{chunk.size}\n\tLog messages this request: #{request[:record_count]}\n\tJSON payload size: #{request[:body].bytesize}\n\tSample: #{request[:body][0,1024]}..." end } rescue JSON::GeneratorError $log.warn "Unable to format message due to JSON::GeneratorError." raise end end