class Aggro::ZeroMQTransport::Subscriber
Public: Handles subscribing to messages on a given endpoint.
Public Class Methods
new(endpoint, callable = nil, &block)
click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 7 def initialize(endpoint, callable = nil, &block) fail ArgumentError unless callable || block_given? @callable = block_given? ? block : callable @endpoint = endpoint @mutex = Mutex.new end
Public Instance Methods
add_subscription(topic)
click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 15 def add_subscription(topic) start unless @mutex.synchronize { @running } @mutex.synchronize { sub_socket.setsockopt ZMQ::SUBSCRIBE, topic } self end
start()
click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 23 def start @mutex.synchronize do return self if @running sub_socket start_on_thread sleep 0.01 until @running end self end
stop()
click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 36 def stop @mutex.synchronize do return self unless @running @running = false end self end
Private Instance Methods
handle_message()
click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 48 def handle_message message = '' sub_socket.recv_string message @callable.call message if message.present? end
start_on_thread()
click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 64 def start_on_thread Concurrent::SingleThreadExecutor.new.post do poller = ZeroMQ::Poller.new poller.register_readable sub_socket @running = true (handle_message while poller.poll(1) > 0) while @running poller.deregister_readable sub_socket sub_socket.close @sub_socket = nil end end
sub_socket()
click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 55 def sub_socket @sub_socket ||= begin socket = ZeroMQTransport.context.socket(ZMQ::SUB) socket.connect @endpoint socket end end