class LogStash::Inputs::DeadLetterQueue
Logstash input to read events from Logstash's dead letter queue
- source, sh
input {
dead_letter_queue { path => "/var/logstash/data/dead_letter_queue" timestamp => "2017-04-04T23:40:37" }
}
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/dead_letter_queue.rb, line 41 def register if @sincedb_path.nil? datapath = File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "dead_letter_queue", @pipeline_id) # Ensure that the filepath exists before writing, since it's deeply nested. FileUtils::mkdir_p datapath @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest(@path)) elsif File.directory?(@sincedb_path) raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"") end dlq_path = java.nio.file.Paths.get(File.join(@path, @pipeline_id)) sincedb_path = @sincedb_path ? java.nio.file.Paths.get(@sincedb_path) : nil start_timestamp = @start_timestamp ? org.logstash.Timestamp.new(@start_timestamp) : nil @inner_plugin = org.logstash.input.DeadLetterQueueInputPlugin.new(dlq_path, @commit_offsets, sincedb_path, start_timestamp) @inner_plugin.register end
run(logstash_queue)
click to toggle source
# File lib/logstash/inputs/dead_letter_queue.rb, line 59 def run(logstash_queue) @inner_plugin.run do |entry| event = LogStash::Event.new(entry.event.toMap()) event.set("[@metadata][dead_letter_queue][plugin_type]", entry.plugin_type) event.set("[@metadata][dead_letter_queue][plugin_id]", entry.plugin_id) event.set("[@metadata][dead_letter_queue][reason]", entry.reason) event.set("[@metadata][dead_letter_queue][entry_time]", entry.entry_time) decorate(event) logstash_queue << event end end
stop()
click to toggle source
# File lib/logstash/inputs/dead_letter_queue.rb, line 72 def stop @inner_plugin.close end