class FastlyNsq::Producer

Provides an adapter to an Nsq::Producer and used to write messages to the queue.

@example

producer = FastlyNsq::Producer.new(topic: 'topic)
producer.write('my message')

Constants

DEFAULT_CONNECTION_TIMEOUT

Attributes

connect_timeout[R]

@return [Integer] connection timeout in seconds

connection[R]

@return [Nsq::Producer]

logger[R]

@return [Logger]

tls_options[R]
topic[R]

@return [String] NSQ Topic

Public Class Methods

new(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT) click to toggle source

Create a FastlyNsq::Producer

@param topic [String] NSQ topic on which to deliver the message @param tls_options [Hash] Hash of TSL options passed the connection.

In most cases this should be nil unless you need to override the
default values set in ENV.

@param logger [Logger] defaults to FastlyNsq.logger @param connect_timeout [Integer] NSQ connection timeout in seconds

# File lib/fastly_nsq/producer.rb, line 33
def initialize(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT)
  @topic           = topic
  @tls_options     = FastlyNsq::TlsOptions.as_hash(tls_options)
  @connect_timeout = connect_timeout
  @logger          = logger

  connect
end

Public Instance Methods

connect() click to toggle source

Create an Nsq::Producer and set as +@connection+ instance variable @return [Boolean]

# File lib/fastly_nsq/producer.rb, line 73
def connect
  lookupd = FastlyNsq.lookupd_http_addresses

  @connection ||= Nsq::Producer.new(
    tls_options.merge(
      nsqlookupd:  lookupd,
      topic:       topic,
    ),
  )

  timeout_args = [connect_timeout, FastlyNsq::ConnectionFailed]

  if RUBY_VERSION > '2.4.0'
    timeout_args << "Failed connection to #{lookupd} within #{connect_timeout} seconds"
  end

  Timeout.timeout(*timeout_args) { Thread.pass until connection.connected? }

  true
rescue FastlyNsq::ConnectionFailed
  logger.error { "Producer for #{topic} failed to connect!" }
  terminate
  raise
end
connected?() click to toggle source

Check conenction status @return [Nsq::Consumer#connected?] @see www.rubydoc.info/gems/nsq-ruby/Nsq/ClientBase#connected%3F-instance_method Nsq::ClientBase#connected?

# File lib/fastly_nsq/producer.rb, line 55
def connected?
  return false unless connection

  connection.connected?
end
terminate() click to toggle source

Terminate the NSQ connection and set connection instance to nil @return [Nsq::Producer#terminate] @see www.rubydoc.info/gems/nsq-ruby/Nsq%2FClientBase:terminate Nsq::ClientBase#terminate

# File lib/fastly_nsq/producer.rb, line 46
def terminate
  connection.terminate
  @connection = nil
end
write(message) click to toggle source

Write a message @return [Nsq::Producer#pop] @see www.rubydoc.info/gems/nsq-ruby/Nsq%2FProducer:write Nsq::Producer#write

# File lib/fastly_nsq/producer.rb, line 65
def write(message)
  raise FastlyNsq::NotConnectedError unless connected?
  connection.write(*message)
end