class LogStash::Inputs::Nats

This input plugin will read events from a NATS instance; it does not support NATS streaming instance. This plugin used the following ruby nats client: github.com/nats-io/ruby-nats

For more information about Nats, see <nats.io>

Examples:

source,ruby

input {

# Read events on subject "example" by using an "url" without authentication
nats {
  url => "nats://localhost:4222"
  subjects => ["example"]
}

}

source,ruby

input {

# Read events on subject "example" by using an "url" with authentication
nats {
  url => "nats://user:passwd@localhost:4222"
  subjects => ["example"]
}

}

source,ruby

input {

# Read events on two subjects by using other paramaters
nats {
  host => "localhost"
  port => 4222
  user => "user"
  pass => "password"
  subjects => [ "first", "second" ]
}

}

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/nats.rb, line 104
def register 

  @nats_server = build_nats_server
  
  @nats_config = {
    uri: @nats_server,
    ssl: @ssl,
    pedantic: @pedantic,
    verbose: @verbose,
    reconnect_time_wait: @reconnect_time_wait.nil? ? nil : @reconnect_time_wait.value,
    max_reconnect_attempts: @max_reconnect_attempts.nil? ? nil : @max_reconnect_attempts.value
  }
end
run(queue) click to toggle source
# File lib/logstash/inputs/nats.rb, line 119
def run(queue)
  ['TERM', 'INT'].each { |s| trap(s) {  puts; exit! } }

  NATS.on_error { |err| puts "Server Error: #{err}"; exit! }

  NATS.start(@nats_config) do
    @subjects.each do |subject|
      puts "Listening on [#{subject}]" #unless $show_raw
      NATS.subscribe(subject, :queue => @queue_group ) do |msg, _, sub|
        @codec.decode(msg) do |event|
          decorate(event)
          event.set("nats_subject", sub)
          queue << event
        end
      end
    end
  end
end

Private Instance Methods

build_nats_server() click to toggle source
# File lib/logstash/inputs/nats.rb, line 140
def build_nats_server
  if @url.nil?
    if @user.nil? || @pass.nil?
      nats_server = "nats://#{@host}:#{@port}"
    else
      nats_server = "nats://#{@user}:#{@pass.value}@#{@host}:#{@port}"
    end
  else
    @logger.warn("Parameter 'url' is set, ignoring connection parameters: 'host', 'port', 'user' and 'pass'")
    nats_server = @url
  end
  return nats_server
end