class Fluent::Plugin::RedisSlowlogInput

Constants

Entry

Attributes

interval[R]
redis[R]
watcher[RW]
watching[RW]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_redis_slowlog.rb, line 23
def configure(conf)
  super

  @redis = Redis.new(
    url: url,
    host: host,
    port: port,
    path: path,
    password: password,
    timeout: timeout
  )
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_redis_slowlog.rb, line 48
def shutdown
  super

  self.watching = false
  redis.quit
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_redis_slowlog.rb, line 36
def start
  super

  if redis.ping != "PONG"
    raise Redis::CannotConnectError,
          "Could not connect to redis"
  end

  self.watching = true
  self.watcher = thread_create(:redis_slowlog_watcher, &method(:watch))
end

Private Instance Methods

emit_slowlog(slowlogs, last_id) click to toggle source
# File lib/fluent/plugin/in_redis_slowlog.rb, line 93
def emit_slowlog(slowlogs, last_id)
  slowlogs.reverse_each do |log|
    # Don't emit logs for entries we've already logged
    next if log.id <= last_id

    log_hash = { "id" => log.id,
                 "time" => Time.at(log.timestamp.to_i).utc.iso8601(3),
                 "exec_time" => log.exec_time_us,
                 "command" => log.command }
    router.emit(tag, Fluent::EventTime.new(log.timestamp.to_i), log_hash)
  end
end
get_slowlogs(size) click to toggle source
# File lib/fluent/plugin/in_redis_slowlog.rb, line 106
def get_slowlogs(size)
  redis.slowlog("get", size).map { |slowlog_entry| Entry.new(*slowlog_entry.first(4)) }
end
output(last_id) click to toggle source
# File lib/fluent/plugin/in_redis_slowlog.rb, line 78
def output(last_id)
  slowlogs = get_slowlogs(logsize)
  return last_id if slowlogs.empty?

  # If the latest entry is smaller than what we last saw, redis was restarted
  # Restart logging from the beginning.
  last_id = -1 if slowlogs.first.id < last_id

  emit_slowlog(slowlogs, last_id)

  # Return the id of the last slowlog entry we've logged
  # The first entry is the one that occurred last
  slowlogs.first.id
end
watch() click to toggle source
# File lib/fluent/plugin/in_redis_slowlog.rb, line 60
def watch
  # Check the current id of the slowlog, and start logging from there
  current_log_id = get_slowlogs(1).first&.id || -1

  begin
    while watching
      sleep interval

      current_log_id = output(current_log_id)
    end
  rescue Redis::BaseError => e
    msg = "Error fetching slowlogs: #{e.inspect}"
    log.error(msg)
    router.emit("#{tag}.error", Fluent::EventTime.new(Time.now.to_i), { "message" => msg })
    retry
  end
end