class EZMQ::Subscriber

Subscribe socket that listens for messages with an optional topic.

Public Class Methods

new(mode = :connect, **options) click to toggle source

Creates a new Subscriber socket.

@note The default behaviour is to output and messages received to STDOUT.

@param [:bind, :connect] mode (:connect) a mode for the socket. @param [Hash] options optional parameters. @option options [String] topic a topic to subscribe to. @see EZMQ::Socket EZMQ::Socket for optional parameters.

@return [Publisher] a new instance of Publisher.

Calls superclass method EZMQ::Socket::new
# File lib/ezmq/subscribe.rb, line 18
def initialize(mode = :connect, **options)
  super mode, ZMQ::SUB, options
  subscribe options[:topic] if options[:topic]
end

Public Instance Methods

listen(&block) click to toggle source

Like receive, but doesn't stop at one message.

@yield [message, topic] passes the message body and topic to the block. @yieldparam [String] message the message received. @yieldparam [String] topic the topic of the message.

@return [void]

# File lib/ezmq/subscribe.rb, line 58
def listen(&block)
  loop do
    block.call(*receive)
  end
end
receive(**options) { |decoded, message| ... } click to toggle source

Receive a message from the socket.

@note This method blocks until a message arrives.

@param [Hash] options optional parameters. @option options [lambda] decode how to decode the message.

@yield [message, topic] passes the message body and topic to the block. @yieldparam [Object] message the message received (decoded). @yieldparam [String] topic the topic of the message.

@return [Object] the message received (decoded).

# File lib/ezmq/subscribe.rb, line 36
def receive(**options)
  message = ''
  @socket.recv_string message

  message = message.match(/^(?<topic>[^\ ]*)\ (?<body>.*)/m)

  decoded = (options[:decode] || @decode).call message['body']
  if block_given?
    yield decoded, message['topic']
  else
    [decoded, message['topic']]
  end
end
subscribe(topic) click to toggle source

Establishes a new message filter on the socket.

@note By default, a Subscriber filters all incoming messages. Without calling subscribe at least once, no messages will be accepted. If topic was provided, initialize calls subscribe automatically.

@param [String] topic a topic to subscribe to. Messages matching this prefix will be accepted.

@return [Boolean] was subscription successful?

# File lib/ezmq/subscribe.rb, line 75
def subscribe(topic)
  @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0
end
unsubscribe(topic) click to toggle source

Removes a message filter (as set with subscribe) from the socket.

@param [String] topic the topic to unsubscribe from. If multiple filters

with the same topic are set, this will only remove one.

@return [Boolean] was unsubscription successful?

# File lib/ezmq/subscribe.rb, line 86
def unsubscribe(topic)
  @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0
end