class Fluent::Plugin::UtmpxInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_utmpx.rb, line 43
def configure(conf)
  @variable_store = Fluent::VariableStore.fetch_or_build(:in_utmpx)
  super
  @utmpx = Linux::Utmpx::UtmpxParser.new
  @buffer = ""
  @tail_position = 0
  @previous_position = 0

  if @variable_store.key?(@pos_file) && !called_in_test?
    plugin_id_using_this_path = @variable_store[@pos_file]
    raise Fluent::ConfigError, "Other 'in_utmpx' plugin already use same pos_file path: plugin_id = #{plugin_id_using_this_path}, pos_file path = #{@pos_file}"
  end
  @variable_store[@pos_file] = self.plugin_id
end
refresh_watchers() click to toggle source
# File lib/fluent/plugin/in_utmpx.rb, line 75
def refresh_watchers
  @tail_position = Fluent::FileWrapper.stat(@path).size
  if Gem::Version.new(Fluent::VERSION) < Gem::Version.new("1.12.0")
    @pe = @pf[@path]
  else
    @pe = @pf[TargetInfo.new(@path, Fluent::FileWrapper.stat(@path).ino)]
  end
  return if (@tail_position - @pe.read_pos) == 0

  if (@tail_position - @pe.read_pos) < 0
    # may be truncated, read from head
    @pe.update_pos(0)
    log.warn("#{@path} may be truncated")
    return
  end

  count = (@tail_position - @pe.read_pos) / @utmpx.num_bytes
  es = MultiEventStream.new
  File.open(@path) do |io|
    io.seek(@pe.read_pos)
    count.times do |n|
      time, record = parse_entry(@utmpx.read(io))
      es.add(time,record)
    end
    @pe.update_pos(@pe.read_pos + count * @utmpx.num_bytes)
    router.emit_stream(@tag, es)
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_utmpx.rb, line 104
def shutdown
  @pf_file.close if @pf_file
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_utmpx.rb, line 58
def start
  super

  pos_file_dir = File.dirname(@pos_file)
  FileUtils.mkdir_p(pos_file_dir, mode: Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(pos_file_dir)
  @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, Fluent::DEFAULT_FILE_PERMISSION)
  @pf_file.sync = true
  target_info = TargetInfo.new(@path, Fluent::FileWrapper.stat(@path).ino)
  if Gem::Version.new(Fluent::VERSION) < Gem::Version.new("1.12.0")
    @pf = TailInput::PositionFile.load(@pf_file, logger: log)
  else
    @pf = TailInput::PositionFile.load(@pf_file, false, {target_info.path => target_info}, logger: log)
  end

  timer_execute(:execute_utmpx, @interval, &method(:refresh_watchers))
end

Private Instance Methods

parse_entry(entry) click to toggle source
# File lib/fluent/plugin/in_utmpx.rb, line 110
def parse_entry(entry)
  record = {
    user: entry.user.dup,
    type: entry.type.dup,
    pid: entry.pid.to_i,
    line: entry.line.dup,
    host: entry.host.dup
  }
  [entry.time, record]
end