class FastlyNsq::Consumer

Provides an adapter to an Nsq::Consumer and used to read messages off the queue.

@example

consumer = FastlyNsq::Consumer.new(
  topic: 'topic',
  channel: 'channel'
)
consumer.size #=> 1
message = consumer.pop
message.body #=> "{ 'data': { 'key': 'value' } }"
message.finish
consumer.size #=> 0
consumer.terminate

Constants

DEFAULT_CONNECTION_TIMEOUT

Default NSQ connection timeout in seconds

Attributes

channel[R]

@return [String] NSQ Channel

connect_timeout[R]

@return [Integer] connection timeout in seconds

connection[R]

@return [Nsq::Consumer]

max_attempts[R]

@return [Integer] maximum number of times an NSQ message will be attempted

tls_options[R]
topic[R]

@return [String] NSQ Topic

Public Class Methods

new(topic:, channel:, queue: nil, tls_options: nil, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **options) click to toggle source

Create a FastlyNsq::Consumer

@param topic [String] NSQ topic from which to consume @param channel [String] NSQ channel from which to consume @param queue [#pop, size] Queue object, most likely an instance of {FastlyNsq::Feeder} @param tls_options [Hash] Hash of TSL options passed the connection.

In most cases this should be nil unless you need to override the
default values set in ENV.

@param connect_timeout [Integer] NSQ connection timeout in seconds @param max_attempts [Integer] maximum number of times an NSQ message will be attemped

When set to +nil+, attempts will be unlimited

@param options [Hash] addtional options forwarded to the connection contructor

@example

consumer = FastlyNsq::Consumer.new(
  topic: 'topic',
  channel: 'channel'
)
# File lib/fastly_nsq/consumer.rb, line 80
def initialize(topic:, channel:, queue: nil, tls_options: nil, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **options)
  @topic           = topic
  @channel         = channel
  @tls_options     = FastlyNsq::TlsOptions.as_hash(tls_options)
  @connect_timeout = connect_timeout
  @max_attempts    = max_attempts

  @connection = connect(queue, **options)
end

Public Instance Methods

empty?() click to toggle source

Is the message queue empty? @return [Boolean]

# File lib/fastly_nsq/consumer.rb, line 93
def empty?
  size.zero?
end

Private Instance Methods

connect(queue, **options) click to toggle source
# File lib/fastly_nsq/consumer.rb, line 101
def connect(queue, **options)
  Nsq::Consumer.new(
    {
      nsqlookupd: FastlyNsq.lookupd_http_addresses,
      topic: topic,
      channel: channel,
      queue: queue,
      max_attempts: max_attempts,
      **options,
    }.merge(tls_options),
  )
end