class EZMQ::Subscriber
Subscribe socket that listens for messages with an optional topic.
Public Class Methods
Creates a new Subscriber
socket.
@note The default behaviour is to output and messages received to STDOUT.
@param [:bind, :connect] mode (:connect) a mode for the socket. @param [Hash] options optional parameters. @option options [String] topic a topic to subscribe to. @see EZMQ::Socket
EZMQ::Socket
for optional parameters.
@return [Publisher] a new instance of Publisher
.
EZMQ::Socket::new
# File lib/ezmq/subscribe.rb, line 18 def initialize(mode = :connect, **options) super mode, ZMQ::SUB, options subscribe options[:topic] if options[:topic] end
Public Instance Methods
Like receive, but doesn't stop at one message.
@yield [message, topic] passes the message body and topic to the block. @yieldparam [String] message the message received. @yieldparam [String] topic the topic of the message.
@return [void]
# File lib/ezmq/subscribe.rb, line 58 def listen(&block) loop do block.call(*receive) end end
Receive a message from the socket.
@note This method blocks until a message arrives.
@param [Hash] options optional parameters. @option options [lambda] decode how to decode the message.
@yield [message, topic] passes the message body and topic to the block. @yieldparam [Object] message the message received (decoded). @yieldparam [String] topic the topic of the message.
@return [Object] the message received (decoded).
# File lib/ezmq/subscribe.rb, line 36 def receive(**options) message = '' @socket.recv_string message message = message.match(/^(?<topic>[^\ ]*)\ (?<body>.*)/m) decoded = (options[:decode] || @decode).call message['body'] if block_given? yield decoded, message['topic'] else [decoded, message['topic']] end end
Establishes a new message filter on the socket.
@note By default, a Subscriber
filters all incoming messages. Without calling subscribe at least once, no messages will be accepted. If topic was provided, initialize calls subscribe
automatically.
@param [String] topic a topic to subscribe to. Messages matching this prefix will be accepted.
@return [Boolean] was subscription successful?
# File lib/ezmq/subscribe.rb, line 75 def subscribe(topic) @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0 end
Removes a message filter (as set with subscribe) from the socket.
@param [String] topic the topic to unsubscribe from. If multiple filters
with the same topic are set, this will only remove one.
@return [Boolean] was unsubscription successful?
# File lib/ezmq/subscribe.rb, line 86 def unsubscribe(topic) @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0 end