class Deployable::Zmq::Subscribe

Deployable::Zmq provides a generic set of helpers do you don't have to do so much leg work.

Public Class Methods

new(options = {}) click to toggle source
# File lib/deployable/zmq/subscribe.rb, line 19
def initialize options = {}
  @context    = ZMQ::Context.new
  @subscriber = @context.socket ZMQ::SUB

  @port       = options.fetch :port, __class_ivg('port')
  @port       = DEFAULT_BIND_PORT if @port.nil?

  @address    = options.fetch :options, __class_ivg('address')
  raise "No valid address [#{@address}]" if @address.nil? or @address == '*'

  url = "tcp://#{@address}:#{@port}"

  raise "Failed to connect to [#{url}] [#{rc}]" unless
    rc = @subscriber.connect( url ) == 0 

  #log.debug "zmq subscribing to [#{url}]"

  @subscribe = options.fetch( :subscribe, nil )
  
  raise "Failed to subscribe to [#{url}] [#{rc}]" unless
    @subscribe.nil? or 
    rc = @subscriber.setsockopt( ZMQ::SUBSCRIBE, 'REQ_COMP' ) == 0 

end

Public Instance Methods

end() click to toggle source

End a subscription

# File lib/deployable/zmq/subscribe.rb, line 59
def end
  @subscriber.close
end
go(callback) click to toggle source

Watch a queue for messages

# File lib/deployable/zmq/subscribe.rb, line 52
def go callback
  loop do
    callback.call receive
  end
end
receive() click to toggle source

Recieve a message off the queue

# File lib/deployable/zmq/subscribe.rb, line 45
def receive
  @subscriber.recv_strings( parts = [] )
  #log.debug "a queue item [%s]\n", parts.join(' ')
  parts
end