class Natsy::Client

The Natsy::Client class provides a basic interface for subscribing to messages by subject & queue, and replying to those messages. It also logs most functionality if desired.

Public Class Methods

reply_to(subject, queue: nil, &block) click to toggle source

Register a message handler with the Natsy::Client::reply_to method. Pass a subject string as the first argument (either a static subject string or a pattern to match more than one subject). Specify a queue (or don't) with the queue: option. If you don't provide the queue: option, it will be set to the value of Natsy::Config::default_queue, or to nil (no queue) if a default queue hasn't been set.

The result of the given block will be published in reply to the message. The block is passed two arguments when a message matching the subject is received: data and subject. The data argument is the payload of the message (JSON objects/arrays will be parsed into string-keyed Hash objects/Array objects, respectively). The subject argument is the subject of the message received (mostly only useful if a pattern was specified instead of a static subject string).

@example

Natsy::Client.reply_to("some.subject", queue: "foobar") { |data| "Got it! #{data.inspect}" }

Natsy::Client.reply_to("some.*.pattern") { |data, subject| "Got #{data} on #{subject}" }

Natsy::Client.reply_to("other.subject") do |data|
  if data["foo"] == "bar"
    { is_bar: "Yep!" }
  else
    { is_bar: "No way!" }
  end
end

Natsy::Client.reply_to("subject.in.queue", queue: "barbaz") do
  "My turn!"
end
# File lib/natsy/client.rb, line 60
def reply_to(subject, queue: nil, &block)
  queue = Utils.presence(queue) || Config.default_queue
  queue_desc = " in queue '#{queue}'" if queue
  log("Registering a reply handler for subject '#{subject}'#{queue_desc}", level: :debug)
  register_reply!(subject: subject.to_s, handler: block, queue: queue.to_s)
end
reset!() click to toggle source

**USE WITH CAUTION:** This method (::reset!) clears all subscriptions, stops listening (if started), and kills any active threads.

# File lib/natsy/client.rb, line 131
def reset!
  replies.clear
  stop!
  kill!
end
start!() click to toggle source

Start listening for messages with the Natsy::Client::start! method. This will spin up a non-blocking thread that subscribes to subjects (as specified by invocation(s) of ::reply_to) and waits for messages to come in. When a message is received, the appropriate ::reply_to block will be used to compute a response, and that response will be published.

@example

Natsy::Client.start!

NOTE: If an error is raised in one of the handlers, Natsy::Client will restart automatically.

NOTE: You can invoke ::reply_to to create additional message subscriptions after Natsy::Client.start!, but be aware that this forces the client to restart. You may see (benign, already-handled) errors in the logs generated when this restart happens. It will force the client to restart and re-subscribe after _each additional ::reply_to invoked after ::start!._ So, if you have a lot of additional ::reply_to invocations, you may want to consider refactoring so that your call to Natsy::Client.start! occurs after those additions.

NOTE: The ::start! method can be safely called multiple times; only the first will be honored, and any subsequent calls to ::start! after the client is already started will do nothing (except write a _“NATS is already running”_ log to the logger at the DEBUG level).

# File lib/natsy/client.rb, line 95
def start!
  log("Starting NATS", level: :debug)

  if started?
    log("NATS is already running", level: :debug)
    return
  end

  started!

  thread = Thread.new do
    Thread.handle_interrupt(StandardError => :never) do
      Thread.handle_interrupt(StandardError => :immediate) { listen }
    rescue NATS::ConnectError => e
      log("Could not connect to NATS server:", level: :error)
      log(e.full_message, level: :error, indent: 2)
      Thread.current.exit
    rescue NewSubscriptionsError => _e
      log("New subscriptions! Restarting...", level: :info)
      restart!
      Thread.current.exit
      # raise e # TODO: there has to be a better way
    rescue StandardError => e
      log("Encountered an error:", level: :error)
      log(e.full_message, level: :error, indent: 2)
      restart!
      Thread.current.exit
      # raise e
    end
  end

  threads << thread
end
started?() click to toggle source

Returns true if ::start! has already been called (meaning the client is listening to NATS messages). Returns false if it has not yet been called, or if it has been stopped.

# File lib/natsy/client.rb, line 16
def started?
  @started ||= false
end
stopped?() click to toggle source

Opposite of ::started?: returns false if ::start! has already been called (meaning the client is listening to NATS messages). Returns true if it has not yet been called, or if it has been stopped.

# File lib/natsy/client.rb, line 23
def stopped?
  !started?
end

Private Class Methods

current_thread() click to toggle source
# File lib/natsy/client.rb, line 143
def current_thread
  threads.last
end
kill!() click to toggle source
# File lib/natsy/client.rb, line 151
def kill!
  threads.each { |thread| thread.kill if thread.alive? }
end
listen() click to toggle source
# File lib/natsy/client.rb, line 207
      def listen
        NATS.start(servers: Natsy::Config.urls) do
          replies.each do |replier|
            queue_desc = " in queue '#{replier[:queue]}'" if replier[:queue]
            log("Subscribing to subject '#{replier[:subject]}'#{queue_desc}", level: :debug)

            NATS.subscribe(replier[:subject], queue: replier[:queue]) do |message, reply_subject, subject|
              parsed_message = begin
                JSON.parse(message)
              rescue StandardError
                message
              end

              id, data, pattern = if parsed_message.is_a?(Hash)
                parsed_message.values_at("id", "data", "pattern")
              else
                [nil, parsed_message, nil]
              end

              message_data = id && data && pattern ? data : parsed_message

              log("Received a message!")
              message_desc = <<~LOG_MESSAGE
                id:      #{id || '(none)'}
                pattern: #{pattern || '(none)'}
                subject: #{subject || '(none)'}
                data:    #{message_data.to_json}
                inbox:   #{reply_subject || '(none)'}
                queue:   #{replier[:queue] || '(none)'}
                message: #{message}
              LOG_MESSAGE
              log(message_desc, indent: 2)

              raw_response = replier[:handler].call(message_data, subject)

              log("Responding with '#{raw_response}'")

              NATS.publish(reply_subject, raw_response.to_json) if Utils.present?(reply_subject)
            end
          end
        end
      end
log(text, level: :info, indent: 0) click to toggle source
# File lib/natsy/client.rb, line 147
def log(text, level: :info, indent: 0)
  Utils.log(Config.logger, text, level: level, indent: indent)
end
register_reply!(subject:, handler:, queue: nil) click to toggle source
# File lib/natsy/client.rb, line 191
def register_reply!(subject:, handler:, queue: nil)
  raise ArgumentError, "Subject must be a string" unless subject.is_a?(String)
  raise ArgumentError, "Must provide a message handler for #{subject}" unless handler.respond_to?(:call)
  raise ArgumentError, "Already registered a reply to #{subject}" if reply_registered?(subject)

  reply = {
    subject: subject,
    handler: handler,
    queue: Utils.presence(queue) || Config.default_queue,
  }

  replies << reply

  current_thread.raise(NewSubscriptionsError, "New reply registered") if started?
end
replies() click to toggle source
# File lib/natsy/client.rb, line 182
def replies
  @replies ||= []
end
reply_registered?(raw_subject) click to toggle source
# File lib/natsy/client.rb, line 186
def reply_registered?(raw_subject)
  subject = raw_subject.to_s
  replies.any? { |reply| reply[:subject] == subject }
end
restart!() click to toggle source
# File lib/natsy/client.rb, line 167
def restart!
  log("Restarting NATS", level: :warn)
  stop!
  kill!
  start!
end
started!() click to toggle source
# File lib/natsy/client.rb, line 174
def started!
  @started = true
end
stop!() click to toggle source
# File lib/natsy/client.rb, line 155
def stop!
  log("Stopping NATS", level: :debug)

  begin
    NATS.stop
  rescue StandardError
    nil
  end

  stopped!
end
stopped!() click to toggle source
# File lib/natsy/client.rb, line 178
def stopped!
  @started = false
end
threads() click to toggle source
# File lib/natsy/client.rb, line 139
def threads
  @threads ||= []
end