class RedisStreamLogger::LogDevice
Attributes
Public Class Methods
Creates a new LogDevice
that can be used as a sink for Ruby Logger
@param [Redis] conn connection to Redis @param [String] stream name of key to write to
# File lib/redis_stream_logger/log_device.rb, line 12 def initialize(conn = nil, stream: 'rails-log') @config = Config.new @closed = false # Just in case a whole new config is passed in like in the Railtie new_conf = yield @config if block_given? @config = new_conf if new_conf.is_a?(Config) @config.connection ||= conn @config.stream_name ||= stream raise ArgumentError, 'must provide connection' if @config.connection.nil? @q = Queue.new start end
Public Instance Methods
# File lib/redis_stream_logger/log_device.rb, line 36 def close return if @closed @q.push :exit @ticker.exit @writer.join @config.connection.close @closed = true end
# File lib/redis_stream_logger/log_device.rb, line 30 def reopen(log = nil) close @config.connection._client.connect start end
# File lib/redis_stream_logger/log_device.rb, line 26 def write(msg) @q.push msg end
Private Instance Methods
# File lib/redis_stream_logger/log_device.rb, line 105 def control_msg?(msg) msg == :nudge || msg == :exit end
# File lib/redis_stream_logger/log_device.rb, line 59 def send_options return {} if @config.max_len.nil? { maxlen: @config.max_len, approximate: true } end
# File lib/redis_stream_logger/log_device.rb, line 47 def start @closed = false @error_logger = ::Logger.new(STDERR) @ticker = Thread.new do ticker(@config.send_interval) end @writer = Thread.new do writer(@config.buffer_size, @config.send_interval) end at_exit { close } end
Stores the name of the logger in the configured set so other tools can locate the list of available log streams
# File lib/redis_stream_logger/log_device.rb, line 114 def store_logger_name @config.connection.sadd(@config.log_set_key, @config.stream_name) rescue StandardError => exception @error_logger.warn "unable to store name of log: #{exception}" end
Pushes a message into the queue at the given interval to wake the writer thread up to ensure it sends partial buffers if no new logs come in.
@param [Integer] interval to wake the writer up on
# File lib/redis_stream_logger/log_device.rb, line 98 def ticker(interval) loop do sleep(interval) @q.push(:nudge) end end
Writes a batch of log lines to the Redis stream
@param [Array<String>] messages to write to the stream
# File lib/redis_stream_logger/log_device.rb, line 71 def write_batch(messages) redis = @config.connection opt = send_options messages.each_slice(@config.batch_size) do attempt = 0 begin redis.pipelined do messages.each do |msg| redis.xadd(@config.stream_name, {m: msg}, **opt) end end rescue StandardError => exception attempt += 1 retry if attempt <= 3 @error_logger.warn "unable to write redis logs: #{exception}" messages.each { |m| @error_logger.info(m) } end end end
Used in a thread to pull log messages from a queue and store them in batches into a redis stream.
@param [Integer] buffer_max maximum number of log entries to buffer before sending @param [Integer] interval maximum amount of time to wait before sending a partial buffer
# File lib/redis_stream_logger/log_device.rb, line 128 def writer(buffer_max, interval) last_sent = Time.now buffered = [] store_logger_name loop do msg = @q.pop buffered.push(msg) unless control_msg?(msg) now = Time.new if buffered.count >= buffer_max || (now - last_sent) > interval || msg == :exit write_batch(buffered) return if msg == :exit last_sent = Time.now buffered = [] end end end