class Fluent::Plugin::UtmpxInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_utmpx.rb, line 43 def configure(conf) @variable_store = Fluent::VariableStore.fetch_or_build(:in_utmpx) super @utmpx = Linux::Utmpx::UtmpxParser.new @buffer = "" @tail_position = 0 @previous_position = 0 if @variable_store.key?(@pos_file) && !called_in_test? plugin_id_using_this_path = @variable_store[@pos_file] raise Fluent::ConfigError, "Other 'in_utmpx' plugin already use same pos_file path: plugin_id = #{plugin_id_using_this_path}, pos_file path = #{@pos_file}" end @variable_store[@pos_file] = self.plugin_id end
refresh_watchers()
click to toggle source
# File lib/fluent/plugin/in_utmpx.rb, line 75 def refresh_watchers @tail_position = Fluent::FileWrapper.stat(@path).size if Gem::Version.new(Fluent::VERSION) < Gem::Version.new("1.12.0") @pe = @pf[@path] else @pe = @pf[TargetInfo.new(@path, Fluent::FileWrapper.stat(@path).ino)] end return if (@tail_position - @pe.read_pos) == 0 if (@tail_position - @pe.read_pos) < 0 # may be truncated, read from head @pe.update_pos(0) log.warn("#{@path} may be truncated") return end count = (@tail_position - @pe.read_pos) / @utmpx.num_bytes es = MultiEventStream.new File.open(@path) do |io| io.seek(@pe.read_pos) count.times do |n| time, record = parse_entry(@utmpx.read(io)) es.add(time,record) end @pe.update_pos(@pe.read_pos + count * @utmpx.num_bytes) router.emit_stream(@tag, es) end end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_utmpx.rb, line 104 def shutdown @pf_file.close if @pf_file end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_utmpx.rb, line 58 def start super pos_file_dir = File.dirname(@pos_file) FileUtils.mkdir_p(pos_file_dir, mode: Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(pos_file_dir) @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, Fluent::DEFAULT_FILE_PERMISSION) @pf_file.sync = true target_info = TargetInfo.new(@path, Fluent::FileWrapper.stat(@path).ino) if Gem::Version.new(Fluent::VERSION) < Gem::Version.new("1.12.0") @pf = TailInput::PositionFile.load(@pf_file, logger: log) else @pf = TailInput::PositionFile.load(@pf_file, false, {target_info.path => target_info}, logger: log) end timer_execute(:execute_utmpx, @interval, &method(:refresh_watchers)) end
Private Instance Methods
parse_entry(entry)
click to toggle source
# File lib/fluent/plugin/in_utmpx.rb, line 110 def parse_entry(entry) record = { user: entry.user.dup, type: entry.type.dup, pid: entry.pid.to_i, line: entry.line.dup, host: entry.host.dup } [entry.time, record] end