module ZeroMQ
Public Instance Methods
zeromq_pull_server(name, endpoint = "ipc://
click to toggle source
this does an endless loop as a “server”
# File lib/rzmq-enhancement.rb, line 28 def zeromq_pull_server name, endpoint = "ipc://#{IPCDIR}/#{name}.ipc", ctx: :pull, &block grand_server ZMQ::PULL, name, endpoint, ctx: ctx, bind: true, &block end
zeromq_push(name, endpoint = "ipc://
click to toggle source
This bit will seem confusing, but we must redo grand_pusher
to to make this more sane.
# File lib/rzmq-enhancement.rb, line 15 def zeromq_push name, endpoint = "ipc://#{IPCDIR}/#{name}.ipc", ctx: :push, payload: nil, &block if block_given? grand_pusher ZMQ::PUSH, name, endpoint, ctx: ctx, payload: payload, &block else grand_pusher(ZMQ::PUSH, name, endpoint, ctx: ctx) { payload } end end
zeromq_request(name, endpoint = "ipc://
click to toggle source
we make the request and return the response
# File lib/rzmq-enhancement.rb, line 36 def zeromq_request name, endpoint = "ipc://#{IPCDIR}/#{name}.ipc", **opts, &block h = grand_pusher ZMQ::REQ, name, endpoint, **opts, &block end
zeromq_response_server(name, endpoint = "ipc://
click to toggle source
# File lib/rzmq-enhancement.rb, line 43 def zeromq_response_server name, endpoint = "ipc://#{IPCDIR}/#{name}.ipc", ctx: :default, &block grand_server ZMQ::REP, name, endpoint, bind: true, respond: true, ctx: ctx, &block end
Private Instance Methods
ctx_name(name, opts)
click to toggle source
# File lib/rzmq-enhancement.rb, line 51 def ctx_name name, opts :"#{name}.#{opts[:ctx] || :default}" end
error_check(rc)
click to toggle source
# File lib/rzmq-enhancement.rb, line 126 def error_check rc if ZMQ::Util.resultcode_ok?(rc) false else raise "ZeroMQ Operation failed, errno [#{ZMQ::Util.errno}] description [#{ZMQ::Util.error_string}]" end end
grand_pusher(type, name, endpoint, **opts, &block)
click to toggle source
TODO: We don't handle the non-block req case at all. Do we want to? TODO: We need to rewrite this, it works as is, but is too tricky TODO: to get the semantics right.
# File lib/rzmq-enhancement.rb, line 59 def grand_pusher type, name, endpoint, **opts, &block init_sys ctxname = ctx_name(name,opts) h = if @ctxh[ctxname].nil? h = (@ctxh[ctxname] ||= OpenStruct.new) h.ctx = ZMQ::Context.create(1) h.push_sock = h.ctx.socket(type) error_check(h.push_sock.setsockopt(ZMQ::LINGER, 0)) rc = h.push_sock.connect(endpoint) error_check(rc) h else @ctxh[ctxname] end if block_given? unless opts[:payload] # here, we get the payload from the block payload = block.(h.ctx) rc = h.push_sock.send_string(JSON.generate(payload)) error_check(rc) else # here, we call the block with the results rc = h.push_sock.send_string(JSON.generate(opts[:payload])) error_check(rc) rc = h.push_sock.recv_string(result = '') error_check(rc) block.(JSON.parse(result)) end end h end
grand_server(type, name, endpoint, **opts, &block)
click to toggle source
# File lib/rzmq-enhancement.rb, line 93 def grand_server type, name, endpoint, **opts, &block init_sys ctxname = ctx_name(name,opts) h = (@ctxh[ctxname] ||= OpenStruct.new) h.ctx = ZMQ::Context.create(1) h.server_sock = h.ctx.socket(type) error_check(h.server_sock.setsockopt(ZMQ::LINGER, 0)) rc = if opts[:bind] h.server_sock.bind(endpoint) else h.server_sock.connect(endpoint) end error_check(rc) loop do rc = h.server_sock.recv_string payload = '' error_check(rc) result = block.(JSON.parse(payload)) if opts[:respond] rc = h.server_sock.send_string JSON.generate(result) end end if block_given? h end
init_sys()
click to toggle source
# File lib/rzmq-enhancement.rb, line 121 def init_sys @ctxh ||= {} end