class FastlyNsq::Consumer
Provides an adapter to an Nsq::Consumer and used to read messages off the queue.
@example
consumer = FastlyNsq::Consumer.new( topic: 'topic', channel: 'channel' ) consumer.size #=> 1 message = consumer.pop message.body #=> "{ 'data': { 'key': 'value' } }" message.finish consumer.size #=> 0 consumer.terminate
Constants
- DEFAULT_CONNECTION_TIMEOUT
Default NSQ connection timeout in seconds
Attributes
@return [String] NSQ Channel
@return [Integer] connection timeout in seconds
@return [Nsq::Consumer]
@return [Integer] maximum number of times an NSQ message will be attempted
@return [String] NSQ Topic
Public Class Methods
Create a FastlyNsq::Consumer
@param topic [String] NSQ topic from which to consume @param channel [String] NSQ channel from which to consume @param queue [#pop, size] Queue object, most likely an instance of {FastlyNsq::Feeder} @param tls_options
[Hash] Hash of TSL options passed the connection.
In most cases this should be nil unless you need to override the default values set in ENV.
@param connect_timeout
[Integer] NSQ connection timeout in seconds @param max_attempts
[Integer] maximum number of times an NSQ message will be attemped
When set to +nil+, attempts will be unlimited
@param options [Hash] addtional options forwarded to the connection contructor
@example
consumer = FastlyNsq::Consumer.new( topic: 'topic', channel: 'channel' )
# File lib/fastly_nsq/consumer.rb, line 80 def initialize(topic:, channel:, queue: nil, tls_options: nil, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **options) @topic = topic @channel = channel @tls_options = FastlyNsq::TlsOptions.as_hash(tls_options) @connect_timeout = connect_timeout @max_attempts = max_attempts @connection = connect(queue, **options) end
Public Instance Methods
Is the message queue empty? @return [Boolean]
# File lib/fastly_nsq/consumer.rb, line 93 def empty? size.zero? end
Private Instance Methods
# File lib/fastly_nsq/consumer.rb, line 101 def connect(queue, **options) Nsq::Consumer.new( { nsqlookupd: FastlyNsq.lookupd_http_addresses, topic: topic, channel: channel, queue: queue, max_attempts: max_attempts, **options, }.merge(tls_options), ) end