class Swiftcore::Analogger::Client
Swift::Analogger::Client is the client library for writing logging messages to the Swift Analogger
asynchronous logging server.
To use the Analogger
client, instantiate an instance of the Client
class.
logger = Swift::Analogger::Client.new(:myapplog,'127.0.0.1',12345)
Four arguments are accepted when a new Client
is created. The first is the name of the logging facility that this Client
will write to. The second is the hostname where the Analogger
process is running, and the third is the port number that it is listening on for connections.
The fourth argument is optional. Analogger
can require an authentication key before it will allow logging clients to use its facilities. If the Analogger
that one is connecting to requires an authentication key, it must be passed to the new() call as the fourth argument. If the key is incorrect, the connection will be closed.
If a Client
connects to the Analogger
using a facility that is undefined in the Analogger
, the log messages will still be accepted, but they will be dumped to the default logging destination.
Once connected, the Client
is ready to deliver messages to the Analogger
. To send a messagine, the log() method is used:
logger.log(:debug,"The logging client is now connected.")
The log() method takes two arguments. The first is the severity of the message, and the second is the message itself. The default Analogger
severity levels are the same as in the standard Ruby
Constants
- ConnectionFailureTimeout
- MaxFailureCount
- MaxLengthBytes
- MaxMessageLength
- PersistentQueueLimit
- ReconnectThrottleInterval
Public Class Methods
# File lib/swiftcore/Analogger/Client.rb, line 72 def self.connection_failure_timeout @connection_failure_timeout ||= ConnectionFailureTimeout end
# File lib/swiftcore/Analogger/Client.rb, line 76 def self.connection_failure_timeout=(val) @connection_failure_timeout = val.to_i end
# File lib/swiftcore/Analogger/Client.rb, line 80 def self.max_failure_count @max_failure_count ||= MaxFailureCount end
# File lib/swiftcore/Analogger/Client.rb, line 84 def self.max_failure_count=(val) @max_failure_count = val.to_i end
# File lib/swiftcore/Analogger/Client.rb, line 114 def initialize(service = -"default", host = -"127.0.0.1" , port = 6766, key = nil) @service = service.to_s @key = key @host = host @port = port @socket = nil klass = self.class @connection_failure_timeout = klass.connection_failure_timeout @max_failure_count = klass.max_failure_count @persistent_queue_limit = klass.persistent_queue_limit @destination = nil @reconnection_thread = nil @authenticated = false @total_count = 0 @logfile = nil @swamp_drainer = nil clear_failure connect end
# File lib/swiftcore/Analogger/EMClient.rb, line 95 def self.new(*args) ClientProtocol.connect(*args) end
# File lib/swiftcore/Analogger/Client.rb, line 88 def self.persistent_queue_limit @persistent_queue_limit ||= PersistentQueueLimit end
# File lib/swiftcore/Analogger/Client.rb, line 92 def self.persistent_queue_limit=(val) @persistent_queue_limit = val.to_i end
# File lib/swiftcore/Analogger/Client.rb, line 104 def self.reconnect_throttle_interval @reconnect_throttle_interval ||= ReconnectThrottleInterval end
# File lib/swiftcore/Analogger/Client.rb, line 108 def self.reconnect_throttle_interval=(val) @reconnect_throttle_interval = val.to_i end
# File lib/swiftcore/Analogger/Client.rb, line 96 def self.tmplog @tmplog end
# File lib/swiftcore/Analogger/Client.rb, line 100 def self.tmplog=(val) @tmplog = val end
Public Instance Methods
# File lib/swiftcore/Analogger/Client.rb, line 387 def authenticated? @authenticated end
# File lib/swiftcore/Analogger/Client.rb, line 395 def close @socket.close end
# File lib/swiftcore/Analogger/Client.rb, line 399 def closed? @socket.closed? end
# File lib/swiftcore/Analogger/Client.rb, line 200 def connect @socket = open_connection(@host, @port) authenticate raise FailedToAuthenticate(@host, @port) unless authenticated? clear_failure if there_is_a_swamp? drain_the_swamp else setup_remote_logging end rescue Exception => e register_failure close_connection setup_reconnect_thread unless @reconnection_thread && Thread.current == @reconnection_thread setup_local_logging raise e if fail_connect? end
# File lib/swiftcore/Analogger/Client.rb, line 142 def connection_failure_timeout @connection_failure_timeout end
# File lib/swiftcore/Analogger/Client.rb, line 146 def connection_failure_timeout=(val) @connection_failure_timeout = val.to_i end
# File lib/swiftcore/Analogger/Client.rb, line 58 def log(severity, msg) if @destination == :local _local_log(@service, severity, msg) else _remote_log(@service, severity, msg) end rescue Exception @authenticated = false setup_local_logging setup_reconnect_thread end
# File lib/swiftcore/Analogger/Client.rb, line 150 def max_failure_count @max_failure_count end
# File lib/swiftcore/Analogger/Client.rb, line 154 def max_failure_count=(val) @max_failure_count = val.to_i end
# File lib/swiftcore/Analogger/Client.rb, line 166 def persistent_queue_limit @persistent_queue_limit end
# File lib/swiftcore/Analogger/Client.rb, line 170 def persistent_queue_limit=(val) @persistent_queue_limit = val.to_i end
# File lib/swiftcore/Analogger/Client.rb, line 158 def ram_queue_limit @ram_queue_limit end
# File lib/swiftcore/Analogger/Client.rb, line 162 def ram_queue_limit=(val) @ram_queue_limit = val.to_i end
# File lib/swiftcore/Analogger/Client.rb, line 391 def reconnect connect(@host,@port) end
# File lib/swiftcore/Analogger/Client.rb, line 190 def reconnect_throttle_interval @reconnect_throttle_interval ||= self.class.reconnect_throttle_interval end
# File lib/swiftcore/Analogger/Client.rb, line 194 def reconnect_throttle_interval=(val) @reconnect_throttle_interval = val.to_i end
# File lib/swiftcore/Analogger/Client.rb, line 178 def tmplog @tmplog ||= tmplog_prefix.gsub(/SERVICE/, @service).gsub(/PID/,$$.to_s) end
# File lib/swiftcore/Analogger/Client.rb, line 186 def tmplog=(val) @tmplog = val end
# File lib/swiftcore/Analogger/Client.rb, line 174 def tmplog_prefix File.join(Dir.tmpdir, -"analogger-SERVICE-PID.log") end
# File lib/swiftcore/Analogger/Client.rb, line 182 def tmplogs Dir[tmplog_prefix.gsub(/SERVICE/, @service).gsub(/PID/,-"*")].sort_by {|f| File.mtime(f)} end
# File lib/swiftcore/Analogger/Client.rb, line 138 def total_count @total_count end
Private Instance Methods
# File lib/swiftcore/Analogger/Client.rb, line 330 def _drain_the_swamp # As soon as we start emptying the local log file, ensure that no data # gets missed because of IO buffering. Otherwise, during high rates of # message sending, it is possible to get an EOF on file reading, and # assume all data has been sent, when there are actually records which # are buffered and just haven't been written yet. @logfile && (@logfile.sync = true) tmplogs.each do |logfile| buffer = '' FileTest.exist?(logfile) && File.open(logfile) do |fh| non_blocking_lock_on_file_handle(fh) do # Only one process should read a given file. fh.fdatasync rescue fh.fsync logfile_not_empty = true while logfile_not_empty begin buffer << fh.read_nonblock(8192) unless closed? rescue EOFError logfile_not_empty = false end records = buffer.scan(/^.*?\n/) buffer = buffer[(records.inject(0) {|n, e| n += e.length})..-1] # truncate buffer records.each_index do |n| record = records[n] next if record =~ /^\#/ service, severity, msg = record.split(-":", 3) msg = msg.chomp.gsub(/\x00\x00/, "\n") begin _remote_log(service, severity, msg) rescue # FAIL while draining the swamp. Just reset the buffer from wherever we are, and # keep trying, after a short sleep to allow for recovery. new_buffer = '' records[n..-1].each {|r| new_buffer << r} new_buffer << buffer buffer = new_buffer sleep 1 end end end File.unlink logfile end if tmplog == logfile setup_remote_logging end end end @swamp_drainer = nil rescue Exception => e STDERR.puts "ERROR SENDING LOCALLY SAVED LOGS: #{e}\n#{e.backtrace.inspect}" end
# File lib/swiftcore/Analogger/Client.rb, line 252 def _local_log(service, severity, message) # Convert newlines to a different marker so that log messages can be stuffed onto a single file line. @logfile.flock File::LOCK_EX @logfile.puts "#{service}:#{severity}:#{message.gsub(/\n/,"\x00\x00")}" ensure @logfile.flock File::LOCK_UN end
# File lib/swiftcore/Analogger/Client.rb, line 245 def _remote_log(service, severity, message) @total_count += 1 len = MaxLengthBytes + MaxLengthBytes + service.length + severity.length + message.length + 3 ll = sprintf("%0#{MaxLengthBytes}i%0#{MaxLengthBytes}i", len, len) @socket.write "#{ll}:#{service}:#{severity}:#{message}" end
# File lib/swiftcore/Analogger/Client.rb, line 297 def authenticate begin _remote_log(@service, -"authentication", "#{@key}") response = @socket.gets rescue Exception response = nil end if response && response =~ /accepted/ @authenticated = true else @authenticated = false end end
# File lib/swiftcore/Analogger/Client.rb, line 292 def clear_failure @failed_at = nil @failure_count = 0 end
# File lib/swiftcore/Analogger/Client.rb, line 267 def close_connection @socket.close if @socket and !@socket.closed? end
# File lib/swiftcore/Analogger/Client.rb, line 318 def drain_the_swamp unless @swamp_drainer @swap_drainer = Thread.new { _drain_the_swamp } end end
# File lib/swiftcore/Analogger/Client.rb, line 276 def fail_connect? failed_too_many? || failed_too_long? end
# File lib/swiftcore/Analogger/Client.rb, line 280 def failed? !@failed_at.nil? end
# File lib/swiftcore/Analogger/Client.rb, line 288 def failed_too_long? failed? && ( @failed_at + @connection_failure_timeout ) < Time.now end
# File lib/swiftcore/Analogger/Client.rb, line 284 def failed_too_many? @failure_count > @max_failure_count end
# File lib/swiftcore/Analogger/Client.rb, line 324 def non_blocking_lock_on_file_handle(fh, &block) fh.flock(File::LOCK_EX|File::LOCK_NB) ? yield : false ensure fh.flock File::LOCK_UN end
# File lib/swiftcore/Analogger/Client.rb, line 260 def open_connection(host, port) socket = Socket.new(AF_INET,SOCK_STREAM,0) sockaddr = Socket.pack_sockaddr_in(port,host) socket.connect(sockaddr) socket end
# File lib/swiftcore/Analogger/Client.rb, line 271 def register_failure @failed_at ||= Time.now @failure_count += 1 end
# File lib/swiftcore/Analogger/Client.rb, line 222 def setup_local_logging unless @logfile && !@logfile.closed? @logfile = File.open(tmplog,-"a+") @destination = :local end end
# File lib/swiftcore/Analogger/Client.rb, line 233 def setup_reconnect_thread return if @reconnection_thread @reconnection_thread = Thread.new do while true sleep reconnect_throttle_interval connect rescue nil break if @socket && !closed? end @reconnection_thread = nil end end
# File lib/swiftcore/Analogger/Client.rb, line 229 def setup_remote_logging @destination = :remote end
# File lib/swiftcore/Analogger/Client.rb, line 312 def there_is_a_swamp? tmplogs.each do |logfile| break true if FileTest.exist?(logfile) && File.size(logfile) > 0 end end