class FastlyNsq::Http::Nsqd

Provides an interface to the the functionality provided by the nsqd HTTP interface

@see nsq.io/components/nsqd.html

Constants

BASE_NSQD_URL
VALID_FORMATS

Attributes

client[RW]

Public Class Methods

channel_create(topic:, channel:, **args) click to toggle source

Create a channel for an existing topic

@param topic [String] the existing topic @param channel [String] the channel to create

# File lib/fastly_nsq/http/nsqd.rb, line 145
def self.channel_create(topic:, channel:, **args)
  new(request_uri: '/channel/create', **args).post(topic: topic, channel: channel)
end
channel_delete(topic:, channel:, **args) click to toggle source

Delete an existing channel for an existing topic

@param topic [String] the existing topic @param channel [String] the channel to delete

# File lib/fastly_nsq/http/nsqd.rb, line 154
def self.channel_delete(topic:, channel:, **args)
  new(request_uri: '/channel/delete', **args).post(topic: topic, channel: channel)
end
channel_empty(topic:, channel:, **args) click to toggle source

Empty all queued messages (in-memory and disk) for an existing channel

@param topic [String] the existing topic @param channel [String] the channel to empty

# File lib/fastly_nsq/http/nsqd.rb, line 163
def self.channel_empty(topic:, channel:, **args)
  new(request_uri: '/channel/empty', **args).post(topic: topic, channel: channel)
end
channel_pause(topic:, channel:, **args) click to toggle source

Pause message flow to consumers of an existing channel (messages will queue at the channel)

@param topic [String] the existing topic @param channel [String] the channel to pause

# File lib/fastly_nsq/http/nsqd.rb, line 173
def self.channel_pause(topic:, channel:, **args)
  new(request_uri: '/channel/pause', **args).post(topic: topic, channel: channel)
end
channel_unpause(topic:, channel:, **args) click to toggle source

Resume message flow to consumers of and existing, paused, channel

@param topic [String] the existing topic @param channel [String] the existing, paused, channel to unpause

# File lib/fastly_nsq/http/nsqd.rb, line 182
def self.channel_unpause(topic:, channel:, **args)
  new(request_uri: '/channel/unpause', **args).post(topic: topic, channel: channel)
end
config_nsqlookupd_tcp_addresses(**args) click to toggle source

List of nsqlookupd TCP addresses

# File lib/fastly_nsq/http/nsqd.rb, line 95
def self.config_nsqlookupd_tcp_addresses(**args)
  new(request_uri: '/config/nsqlookupd_tcp_addresses', **args).get
end
info(**args) click to toggle source

NSQ version information

# File lib/fastly_nsq/http/nsqd.rb, line 35
def self.info(**args)
  new(request_uri: '/info', **args).get
end
mpub(topic:, binary: false, message:, **args) click to toggle source

Publish multiple messages in one roundtrip

NOTE: by default /mpub expects messages to be delimited by \n, use the

+binary: true+ parameter to enable binary mode where message body
is expected to be in the following format (the HTTP Content-Length
header should be sent as the total size of the POST body):
   [ 4-byte num messages ]
   [ 4-byte message #1 size ][ N-byte binary data ]
       ... (repeated <num_messages> times)

TODO: setup Content-Legth header when binary is passed.

@param topic [String] the topic to publish to @param binary [Boolean] enables binary mode @param message the messages to send with n used to seperate messages

# File lib/fastly_nsq/http/nsqd.rb, line 86
def self.mpub(topic:, binary: false, message:, **args)
  binary_param = binary ? 'true' : 'false'
  raise NotImplementedError, 'binary mode has yet to be implemented' if binary
  params = { topic: topic, binary: binary_param }
  new(request_uri: '/mpub', **args).post(params, message)
end
new(request_uri:, base_uri: BASE_NSQD_URL, adapter: FastlyNsq::Http) click to toggle source

Nsqd http wrapper. Provides a simple interface to all NSQD http api's @see nsq.io/components/nsqd.html

@attr [String] request_uri the request you would like to call ie: '/thing' @attr [String] base_uri the host, port, and protocol of your nsqd @attr [Object] adapter the http adapter you would like to use…

# File lib/fastly_nsq/http/nsqd.rb, line 193
def initialize(request_uri:, base_uri: BASE_NSQD_URL, adapter: FastlyNsq::Http)
  @base_uri = base_uri
  @adapter = adapter
  uri = URI.join(@base_uri, request_uri)

  @client = @adapter.new(uri: uri)
  @client.use_ssl
end
ping(**args) click to toggle source

Monitoring endpoint, should return 200 OK. It returns an HTTP 500 if it is not healthy.

NOTE: The only “unhealthy” state is if nsqd failed to write messages to disk when overflow occurred.

# File lib/fastly_nsq/http/nsqd.rb, line 29
def self.ping(**args)
  new(request_uri: '/ping', **args).get
end
pub(topic:, message:, defer: nil, **args) click to toggle source

Publish a message

@param topic [String] the topic to publish to @param defer [String] the time in ms to delay message delivery @param message the message body

# File lib/fastly_nsq/http/nsqd.rb, line 64
def self.pub(topic:, message:, defer: nil, **args)
  params = { topic: topic }
  params[:defer] = defer if defer
  new(request_uri: '/pub', **args).post(params, message)
end
stats(topic: nil, channel: nil, format: 'json', **args) click to toggle source

Return Internal Statistics

@param topic [String] filter to topic @param channel [String] filter to channel @param format [String] can be text or json

@raise [InvaildFormatError] provided format is not in list of valid formats

@example Fetch Statistics for topic: 'foo', channel: 'bar' as text

Nsqd.stats(topic: 'foo', channel: 'bar', format: 'text')
# File lib/fastly_nsq/http/nsqd.rb, line 50
def self.stats(topic: nil, channel: nil, format: 'json', **args)
  raise InvalidFormatError unless VALID_FORMATS.include?(format)
  params = { format: format }
  params[:topic] = topic if topic
  params[:channel] = channel if channel
  new(request_uri: '/stats', **args).get(params)
end
topic_create(topic:, **args) click to toggle source

Create a topic

@param topic [String] the topic to create

# File lib/fastly_nsq/http/nsqd.rb, line 103
def self.topic_create(topic:, **args)
  new(request_uri: '/topic/create', **args).post(topic: topic)
end
topic_delete(topic:, **args) click to toggle source

Delete a topic (and all of its channels)

@param topic [String] the existing topic to delete

# File lib/fastly_nsq/http/nsqd.rb, line 111
def self.topic_delete(topic:, **args)
  new(request_uri: '/topic/delete', **args).post(topic: topic)
end
topic_empty(topic:, **args) click to toggle source

Empty all the queued messages (in-memory and disk) for an existing topic

@param topic [String] the existing topic to empty

# File lib/fastly_nsq/http/nsqd.rb, line 119
def self.topic_empty(topic:, **args)
  new(request_uri: '/topic/empty', **args).post(topic: topic)
end
topic_pause(topic:, **args) click to toggle source

Pause message flow to all channels on an existing topic (messages will queue at the topic)

@param topic [String] the existing topic to pause

# File lib/fastly_nsq/http/nsqd.rb, line 128
def self.topic_pause(topic:, **args)
  new(request_uri: '/topic/pause', **args).post(topic: topic)
end
topic_unpause(topic:, **args) click to toggle source

Unpause message flow to all channels of an existing, paused, topic

@param topic [String] the existing, paused topic to unpause

# File lib/fastly_nsq/http/nsqd.rb, line 136
def self.topic_unpause(topic:, **args)
  new(request_uri: '/topic/unpause', **args).post(topic: topic)
end