class RFlow::Connections::ZMQStreamer

The broker process responsible for shuttling messages back and forth on a many-to-many pipeline link. (Solutions without a broker only allow a 1-to-many or many-to-1 connection.)

Attributes

back[R]
connection[R]
context[R]
front[R]

Public Class Methods

new(config) click to toggle source
Calls superclass method
# File lib/rflow/connections/zmq_connection.rb, line 152
def initialize(config)
  @connection = config.connection
  super("broker-#{connection.name}", 'Broker')
end

Public Instance Methods

run_process() click to toggle source

Start the broker process. Returns when things are shutting down. @return [void]

# File lib/rflow/connections/zmq_connection.rb, line 159
def run_process
  version = LibZMQ::version
  RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
  @context = ZMQ::Context.new
  RFlow.logger.debug { "Connecting message broker to route from #{connection.options['output_address']} to #{connection.options['input_address']}" }

  @front = case connection.options['output_socket_type']
           when 'PUSH'; context.socket(ZMQ::PULL)
           when 'PUB'; context.socket(ZMQ::XSUB)
           else raise ArgumentError, "Unknown output socket type #{connection.options['output_socket_type']}"
           end
  @back = case connection.options['input_socket_type']
          when 'PULL'; context.socket(ZMQ::PUSH)
          when 'SUB'; context.socket(ZMQ::XPUB)
          else raise ArgumentError, "Unknown input socket type #{connection.options['input_socket_type']}"
          end
  front.bind(connection.options['output_address'])
  back.bind(connection.options['input_address'])
  while true
    ZMQ::Proxy.new(front, back)
  end
rescue Exception => e
  RFlow.logger.error "Error running message broker: #{e.class}: #{e.message}, because: #{e.backtrace.inspect}"
ensure
  back.close if back
  front.close if front
  context.terminate if context
end