module ZeroMQ

Public Instance Methods

zeromq_pull_server(name, endpoint = "ipc:// click to toggle source

this does an endless loop as a “server”

# File lib/rzmq-enhancement.rb, line 28
def zeromq_pull_server name,
                       endpoint = "ipc://#{IPCDIR}/#{name}.ipc",
                       ctx: :pull,
                       &block
  grand_server ZMQ::PULL, name, endpoint, ctx: ctx, bind: true, &block
end
zeromq_push(name, endpoint = "ipc:// click to toggle source

This bit will seem confusing, but we must redo grand_pusher to to make this more sane.

# File lib/rzmq-enhancement.rb, line 15
def zeromq_push name,
                endpoint = "ipc://#{IPCDIR}/#{name}.ipc",
                ctx: :push,
                payload: nil,
                &block
  if block_given?
    grand_pusher ZMQ::PUSH, name, endpoint, ctx: ctx, payload: payload, &block
  else
    grand_pusher(ZMQ::PUSH, name, endpoint, ctx: ctx) { payload }
  end
end
zeromq_request(name, endpoint = "ipc:// click to toggle source

we make the request and return the response

# File lib/rzmq-enhancement.rb, line 36
def zeromq_request name,
                   endpoint = "ipc://#{IPCDIR}/#{name}.ipc",
                   **opts,
                   &block
  h = grand_pusher ZMQ::REQ, name, endpoint, **opts, &block
end
zeromq_response_server(name, endpoint = "ipc:// click to toggle source
# File lib/rzmq-enhancement.rb, line 43
def zeromq_response_server name,
                           endpoint = "ipc://#{IPCDIR}/#{name}.ipc",
                           ctx: :default,
                           &block
  grand_server ZMQ::REP, name, endpoint, bind: true, respond: true, ctx: ctx, &block
end

Private Instance Methods

ctx_name(name, opts) click to toggle source
# File lib/rzmq-enhancement.rb, line 51
def ctx_name name, opts
  :"#{name}.#{opts[:ctx] || :default}"
end
error_check(rc) click to toggle source
# File lib/rzmq-enhancement.rb, line 126
def error_check rc
  if ZMQ::Util.resultcode_ok?(rc)
    false
  else
    raise "ZeroMQ Operation failed, errno [#{ZMQ::Util.errno}] description [#{ZMQ::Util.error_string}]"
  end
end
grand_pusher(type, name, endpoint, **opts, &block) click to toggle source

TODO: We don't handle the non-block req case at all. Do we want to? TODO: We need to rewrite this, it works as is, but is too tricky TODO: to get the semantics right.

# File lib/rzmq-enhancement.rb, line 59
def grand_pusher type, name, endpoint, **opts, &block
  init_sys
  ctxname = ctx_name(name,opts)
  h = if @ctxh[ctxname].nil?
        h = (@ctxh[ctxname] ||= OpenStruct.new)
        h.ctx = ZMQ::Context.create(1)
        h.push_sock = h.ctx.socket(type)
        error_check(h.push_sock.setsockopt(ZMQ::LINGER, 0))
        rc = h.push_sock.connect(endpoint)
        error_check(rc)
        h
      else
        @ctxh[ctxname]
      end

  if block_given?
    unless opts[:payload]
      # here, we get the payload from the block
      payload = block.(h.ctx)
      rc = h.push_sock.send_string(JSON.generate(payload))
      error_check(rc)
    else
      # here, we call the block with the results
      rc = h.push_sock.send_string(JSON.generate(opts[:payload]))
      error_check(rc)
      rc = h.push_sock.recv_string(result = '')
      error_check(rc)
      block.(JSON.parse(result))
    end
  end
  h
end
grand_server(type, name, endpoint, **opts, &block) click to toggle source
# File lib/rzmq-enhancement.rb, line 93
def grand_server type, name, endpoint, **opts, &block
  init_sys
  ctxname = ctx_name(name,opts)
  h = (@ctxh[ctxname] ||= OpenStruct.new)
  h.ctx = ZMQ::Context.create(1)

  h.server_sock = h.ctx.socket(type)
  error_check(h.server_sock.setsockopt(ZMQ::LINGER, 0))
  rc = if opts[:bind]
         h.server_sock.bind(endpoint)
       else
         h.server_sock.connect(endpoint)
       end
  error_check(rc)

  loop do
    rc = h.server_sock.recv_string payload = ''
    error_check(rc)

    result = block.(JSON.parse(payload))
    if opts[:respond]
      rc = h.server_sock.send_string JSON.generate(result)
    end
  end if block_given?
  h
end
init_sys() click to toggle source
# File lib/rzmq-enhancement.rb, line 121
def init_sys
  @ctxh ||= {}
end