class Aggro::ZeroMQTransport::Subscriber

Public: Handles subscribing to messages on a given endpoint.

Public Class Methods

new(endpoint, callable = nil, &block) click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 7
def initialize(endpoint, callable = nil, &block)
  fail ArgumentError unless callable || block_given?

  @callable = block_given? ? block : callable
  @endpoint = endpoint
  @mutex = Mutex.new
end

Public Instance Methods

add_subscription(topic) click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 15
def add_subscription(topic)
  start unless @mutex.synchronize { @running }

  @mutex.synchronize { sub_socket.setsockopt ZMQ::SUBSCRIBE, topic }

  self
end
start() click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 23
def start
  @mutex.synchronize do
    return self if @running

    sub_socket
    start_on_thread

    sleep 0.01 until @running
  end

  self
end
stop() click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 36
def stop
  @mutex.synchronize do
    return self unless @running

    @running = false
  end

  self
end

Private Instance Methods

handle_message() click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 48
def handle_message
  message = ''
  sub_socket.recv_string message

  @callable.call message if message.present?
end
start_on_thread() click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 64
def start_on_thread
  Concurrent::SingleThreadExecutor.new.post do
    poller = ZeroMQ::Poller.new
    poller.register_readable sub_socket

    @running = true

    (handle_message while poller.poll(1) > 0) while @running

    poller.deregister_readable sub_socket

    sub_socket.close
    @sub_socket = nil
  end
end
sub_socket() click to toggle source
# File lib/aggro/zeromq_transport/subscriber.rb, line 55
def sub_socket
  @sub_socket ||= begin
    socket = ZeroMQTransport.context.socket(ZMQ::SUB)
    socket.connect @endpoint

    socket
  end
end