class LogStash::Inputs::Nats
This input plugin will read events from a NATS instance; it does not support NATS streaming instance. This plugin used the following ruby nats client: github.com/nats-io/ruby-nats
For more information about Nats
, see <nats.io>
Examples:
- source,ruby
-
input {
# Read events on subject "example" by using an "url" without authentication nats { url => "nats://localhost:4222" subjects => ["example"] }
}
- source,ruby
-
input {
# Read events on subject "example" by using an "url" with authentication nats { url => "nats://user:passwd@localhost:4222" subjects => ["example"] }
}
- source,ruby
-
input {
# Read events on two subjects by using other paramaters nats { host => "localhost" port => 4222 user => "user" pass => "password" subjects => [ "first", "second" ] }
}
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/nats.rb, line 104 def register @nats_server = build_nats_server @nats_config = { uri: @nats_server, ssl: @ssl, pedantic: @pedantic, verbose: @verbose, reconnect_time_wait: @reconnect_time_wait.nil? ? nil : @reconnect_time_wait.value, max_reconnect_attempts: @max_reconnect_attempts.nil? ? nil : @max_reconnect_attempts.value } end
run(queue)
click to toggle source
# File lib/logstash/inputs/nats.rb, line 119 def run(queue) ['TERM', 'INT'].each { |s| trap(s) { puts; exit! } } NATS.on_error { |err| puts "Server Error: #{err}"; exit! } NATS.start(@nats_config) do @subjects.each do |subject| puts "Listening on [#{subject}]" #unless $show_raw NATS.subscribe(subject, :queue => @queue_group ) do |msg, _, sub| @codec.decode(msg) do |event| decorate(event) event.set("nats_subject", sub) queue << event end end end end end
Private Instance Methods
build_nats_server()
click to toggle source
# File lib/logstash/inputs/nats.rb, line 140 def build_nats_server if @url.nil? if @user.nil? || @pass.nil? nats_server = "nats://#{@host}:#{@port}" else nats_server = "nats://#{@user}:#{@pass.value}@#{@host}:#{@port}" end else @logger.warn("Parameter 'url' is set, ignoring connection parameters: 'host', 'port', 'user' and 'pass'") nats_server = @url end return nats_server end