class RemoteService::Connector::Nats

Attributes

brokers[R]
connection[R]

Public Class Methods

new(brokers) click to toggle source
# File lib/remote_service/connector/nats.rb, line 8
def initialize(brokers)
  @brokers = brokers
  @mutex = Mutex.new
end

Public Instance Methods

exit() click to toggle source
# File lib/remote_service/connector/nats.rb, line 18
def exit
  @conn_thread.exit
end
publish(to_queue, message) click to toggle source
# File lib/remote_service/connector/nats.rb, line 22
def publish(to_queue, message)
  @mutex.synchronize do
    NATS.publish(to_queue, message)
  end
end
request(to_queue, message, &block) click to toggle source
# File lib/remote_service/connector/nats.rb, line 28
def request(to_queue, message, &block)
  @mutex.synchronize do
    NATS.request(to_queue, message, &block)
  end
end
start(&block) click to toggle source
# File lib/remote_service/connector/nats.rb, line 13
def start(&block)
  return connection_thread if !block_given?
  connect(&block)
end
subscribe(service_queue, &block) click to toggle source
# File lib/remote_service/connector/nats.rb, line 34
def subscribe(service_queue, &block)
  NATS.subscribe(service_queue, queue: service_queue, &block)
end

Private Instance Methods

connect() { |nil| ... } click to toggle source
# File lib/remote_service/connector/nats.rb, line 51
def connect
  NATS.on_error do |error|
    yield(nil)
    raise Errors::ConnectionFailedError, 'Connection to NATS cluster failed'
  end
  NATS.start(connect_options) do |connection|
    RemoteService.logger.info "CONNECTED: #{connection.connected_server}"
    RemoteService.logger.info "SERVERS IN POOL: #{connection.server_pool.count}"
    connection.on_reconnect do
      RemoteService.logger.info "RECONNECT, NEW_NODE: #{connection.connected_server}"
    end
    connection.on_disconnect do |reason|
      RemoteService.logger.info "DISCONNECTED: #{reason}"
    end
    yield(connection)
  end
end
connect_options() click to toggle source
# File lib/remote_service/connector/nats.rb, line 42
def connect_options
  {
    dont_randomize_servers: true,
    reconnect_time_wait: ENV.fetch('REMOTE_SERVICE_NATS_RECONNECT_WAIT', 0.2),
    max_reconnect_attempts: ENV.fetch('REMOTE_SERVICE_NATS_RECONNECT_ATTEMPTS', 5),
    servers: @brokers
  }
end
connection_thread() click to toggle source
# File lib/remote_service/connector/nats.rb, line 69
def connection_thread
  lock = Util::Lock.new
  @conn_thread = Thread.new do
    connect do |connection|
      lock.unlock(connection)
    end
  end
  connection = lock.wait.first
  raise Errors::ConnectionFailedError, 'Connection to NATS cluster failed' unless connection
end