class NATS::RPC::Servant

Public Class Methods

new(id: SecureRandom.uuid, **cluster_opts) click to toggle source
# File lib/nats/rpc/servant.rb, line 4
def initialize(id: SecureRandom.uuid, **cluster_opts)
  @id = id

  @nats = NATS::IO::Client.new

  @nats.connect(if cluster_opts.keys.size > 0
    cluster_opts
  else
    NATS::RPC.cluster_opts
  end)

  @count_json_parse_errors = 0
  @count_block_call_errors = 0
  @count_to_json_errors = 0
  @count_messages = 0

  @serve_thread = nil
end

Public Instance Methods

kill() click to toggle source
# File lib/nats/rpc/servant.rb, line 29
def kill
  return unless @serve_thread
  @serve_thread.kill
end
serve(subscribe_to, opts={}, &block) click to toggle source
# File lib/nats/rpc/servant.rb, line 23
def serve(subscribe_to, opts={}, &block)
  @serve_thread = Thread.new do
    self.serve! subscribe_to, opts, &block
  end
end
serve!(subscribe_to, opts={}, &block) click to toggle source
# File lib/nats/rpc/servant.rb, line 34
def serve!(subscribe_to, opts={}, &block)
  sid = @nats.subscribe subscribe_to, opts do |msg_json, reply, subject|
    params = nil
    json_parse_exception = nil
    begin
      params = JSON.parse(msg_json)
    rescue => ex
      @count_json_parse_errors = @count_json_parse_errors + 1
      json_parse_exception = ex
    end

    return @nats.publish reply, error_message(1.0, json_parse_exception.message) if json_parse_exception

    value = nil
    block_call_started_at = Time.now
    block_call_exception = nil
    begin
      value = block.call params, subject
    rescue => ex
      @count_block_call_errors = @count_block_call_errors + 1
      block_call_exception = ex
    end
    block_call_stopped_at = Time.now

    if block_call_exception
      err_msg = error_message(2.0, {
        exception: block_call_exception.class.name,
        message: block_call_exception.message,
        backtrace: block_call_exception.backtrace
      })

      return @nats.publish reply, err_msg
    end

    value_as_json = nil
    begin
      value_as_json = value.to_json
    rescue => ex
      @count_to_json_errors = @count_to_json_errors + 1
      value_to_json_exception = ex
    end

    return @nats.publish reply, error_message(3.0, value_to_json_exception.message) if value_to_json_exception

    response = {
      status: "ok",
      payload: value_as_json,
      took: (block_call_stopped_at - block_call_started_at).floor(2),
      servant: @id
    }

    @nats.publish(reply, response.to_json)

    @count_messages = @count_messages + 1
  end

  last_count_messages = 0
  loop do
    throughput = (last_count_messages - @count_messages).abs
    NATS::RPC.stats "s: #{subscribe_to} q: #{opts[:queue]} - msg: #{@count_messages} tput: #{throughput}/sĀ errs json_parse: #{@count_json_parse_errors} block_call: #{@count_block_call_errors} to_json: #{@count_to_json_errors}"
    last_count_messages = @count_messages
    sleep 1
  end
end

Private Instance Methods

error_message(code, data) click to toggle source
# File lib/nats/rpc/servant.rb, line 101
def error_message(code, data)
  {
    status: "error",
    code: code,
    payload: data.to_json,
    servant: @id
  }.to_json
end