class Nsq::Discovery

Public Class Methods

new(lookupds) click to toggle source

lookupd addresses must be formatted like so: ‘<host>:<http-port>’

# File lib/nsq/discovery.rb, line 13
def initialize(lookupds)
  @lookupds = lookupds
end

Public Instance Methods

nsqds() click to toggle source

Returns an array of nsqds instances

nsqd instances returned are strings in this format: ‘<host>:<tcp-port>’

discovery.nsqds
#=> ['127.0.0.1:4150', '127.0.0.1:4152']

If all nsqlookupd’s are unreachable, raises Nsq::DiscoveryException

# File lib/nsq/discovery.rb, line 26
def nsqds
  gather_nsqds_from_all_lookupds do |lookupd|
    get_nsqds(lookupd)
  end
end
nsqds_for_topic(topic) click to toggle source

Returns an array of nsqds instances that have messages for that topic.

nsqd instances returned are strings in this format: ‘<host>:<tcp-port>’

discovery.nsqds_for_topic('a-topic')
#=> ['127.0.0.1:4150', '127.0.0.1:4152']

If all nsqlookupd’s are unreachable, raises Nsq::DiscoveryException

# File lib/nsq/discovery.rb, line 42
def nsqds_for_topic(topic)
  gather_nsqds_from_all_lookupds do |lookupd|
    get_nsqds(lookupd, topic)
  end
end

Private Instance Methods

gather_nsqds_from_all_lookupds() { |lookupd| ... } click to toggle source
# File lib/nsq/discovery.rb, line 51
def gather_nsqds_from_all_lookupds
  nsqd_list = @lookupds.map do |lookupd|
    yield(lookupd)
  end.flatten

  # All nsqlookupds were unreachable, raise an error!
  if nsqd_list.length > 0 && nsqd_list.all? { |nsqd| nsqd.nil? }
    raise DiscoveryException
  end

  nsqd_list.compact.uniq
end
get_nsqds(lookupd, topic = nil) click to toggle source

Returns an array of nsqd addresses If there’s an error, return nil

# File lib/nsq/discovery.rb, line 66
def get_nsqds(lookupd, topic = nil)
  uri_scheme = 'http://' unless lookupd.match(%r(https?://))
  uri = URI.parse("#{uri_scheme}#{lookupd}")

  uri.query = "ts=#{Time.now.to_i}"
  if topic
    uri.path = '/lookup'
    uri.query += "&topic=#{topic}"
  else
    uri.path = '/nodes'
  end

  begin
    body = Net::HTTP.get(uri)
    data = JSON.parse(body)

    if data['data'] && data['data']['producers']
      data['data']['producers'].map do |producer|
        "#{producer['broadcast_address']}:#{producer['tcp_port']}"
      end
    else
      []
    end
  rescue Exception => e
    error "Error during discovery for #{lookupd}: #{e}"
    nil
  end
end