class ZMQ::Socket

Attributes

name[R]
socket[R]

Public Class Methods

create(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) click to toggle source

Allocates a socket of type type for sending and receiving data.

type can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.

By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.

sock = Socket.create(Context.create, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.

Creation of a new Socket object can return nil when socket creation fails.

if (socket = Socket.new(context.pointer, ZMQ::REQ))
  ...
else
  STDERR.puts "Socket creation failed"
end
# File lib/ffi-rzmq/socket.rb, line 32
def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message}
  new(context_ptr, type, opts) rescue nil
end
new(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) click to toggle source

To avoid rescuing exceptions, use the factory method create for all socket creation.

Allocates a socket of type type for sending and receiving data.

type can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.

By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.

sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.

Creation of a new Socket object can raise an exception. This occurs when the context_ptr is null or when the allocation of the 0mq socket within the context fails.

begin
  socket = Socket.new(context.pointer, ZMQ::REQ)
rescue ContextError => e
  # error handling
end
# File lib/ffi-rzmq/socket.rb, line 65
def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message}
  # users may override the classes used for receiving; class must conform to the
  # same public API as ZMQ::Message
  @receiver_klass = opts[:receiver_class]

  context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context)

  if context_ptr.nil? || context_ptr.null?
    raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null"
  else
    @socket = LibZMQ.zmq_socket context_ptr, type
    if @socket && !@socket.null?
      @name = SocketTypeNameMap[type]
    else
      raise ContextError.new 'zmq_socket', 0, ETERM, "Socket pointer was null"
    end
  end

  @longlong_cache = @int_cache = nil
  @more_parts_array = []
  @option_lookup = []
  populate_option_lookup

  define_finalizer
end

Private Class Methods

close(socket, pid) click to toggle source
# File lib/ffi-rzmq/socket.rb, line 606
def self.close socket, pid
  Proc.new { LibZMQ.zmq_close socket if Process.pid == pid }
end

Public Instance Methods

bind(address) click to toggle source

Binds the socket to an address.

socket.bind("tcp://127.0.0.1:5555")
# File lib/ffi-rzmq/socket.rb, line 178
def bind address
  LibZMQ.zmq_bind @socket, address
end
close() click to toggle source

Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option ZMQ::LINGER.

Returns 0 upon success or when the socket has already been closed. Returns -1 when the operation fails. Check ZMQ::Util.errno for the error code.

rc = socket.close
puts("Given socket was invalid!") unless 0 == rc
# File lib/ffi-rzmq/socket.rb, line 199
def close
  if @socket
    remove_finalizer
    rc = LibZMQ.zmq_close @socket
    @socket = nil
    release_cache
    rc
  else
    0
  end
end
connect(address) click to toggle source

Connects the socket to an address.

rc = socket.connect("tcp://127.0.0.1:5555")
# File lib/ffi-rzmq/socket.rb, line 186
def connect address
  rc = LibZMQ.zmq_connect @socket, address
end
disconnect(endpoint) click to toggle source

Disconnect the socket from the given endpoint.

# File lib/ffi-rzmq/socket.rb, line 490
def disconnect(endpoint)
  LibZMQ.zmq_disconnect(socket, endpoint)
end
getsockopt(name, array) click to toggle source

Get the options set on this socket.

name determines the socket option to request array should be an empty array; a result of the proper type (numeric, string, boolean) will be inserted into the first position.

Valid option_name values:

ZMQ::RCVMORE - true or false
ZMQ::HWM - integer
ZMQ::SWAP - integer
ZMQ::AFFINITY - bitmap in an integer
ZMQ::IDENTITY - string
ZMQ::RATE - integer
ZMQ::RECOVERY_IVL - integer
ZMQ::SNDBUF - integer
ZMQ::RCVBUF - integer
ZMQ::FD     - fd in an integer
ZMQ::EVENTS - bitmap integer
ZMQ::LINGER - integer measured in milliseconds
ZMQ::RECONNECT_IVL - integer measured in milliseconds
ZMQ::BACKLOG - integer
ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds
ZMQ::IPV4ONLY - integer

Returns 0 when the operation completed successfully. Returns -1 when this operation failed.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

# retrieve high water mark
array = []
rc = socket.getsockopt(ZMQ::HWM, array)
hwm = array.first if ZMQ::Util.resultcode_ok?(rc)
# File lib/ffi-rzmq/socket.rb, line 463
def getsockopt name, array
  rc = __getsockopt__ name, array

  if Util.resultcode_ok?(rc) && (RCVMORE == name)
    # convert to boolean
    array[0] = 1 == array[0]
  end

  rc
end
identity() click to toggle source

Convenience method for getting the value of the socket IDENTITY.

# File lib/ffi-rzmq/socket.rb, line 476
def identity
  array = []
  getsockopt IDENTITY, array
  array.at(0)
end
identity=(value) click to toggle source

Convenience method for setting the value of the socket IDENTITY.

# File lib/ffi-rzmq/socket.rb, line 484
def identity=(value)
  setsockopt IDENTITY, value.to_s
end
more_parts?() click to toggle source

Convenience method for checking on additional message parts.

Equivalent to calling #getsockopt with ZMQ::RCVMORE.

Warning: if the call to getsockopt fails, this method will return false and swallow the error.

message_parts = []
message = Message.new
rc = socket.recvmsg(message)
if ZMQ::Util.resultcode_ok?(rc)
  message_parts << message
  while more_parts?
    message = Message.new
    rc = socket.recvmsg(message)
    message_parts.push(message) if resulcode_ok?(rc)
  end
end
# File lib/ffi-rzmq/socket.rb, line 168
def more_parts?
  rc = getsockopt ZMQ::RCVMORE, @more_parts_array

  Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false
end
recv_multipart(list, routing_envelope, flag = 0) click to toggle source

Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list for receiving the message body parts and a routing_envelope for receiving the message parts comprising the 0mq routing information.

# File lib/ffi-rzmq/socket.rb, line 408
def recv_multipart list, routing_envelope, flag = 0
  parts = []
  rc = recvmsgs parts, flag

  if Util.resultcode_ok?(rc)
    routing = true
    parts.each do |part|
      if routing
        routing_envelope << part
        routing = part.size > 0
      else
        list << part
      end
    end
  end

  rc
end
recv_string(string, flags = 0) click to toggle source

Helper method to make a new #Message instance and convert its payload to a string.

flags may be ZMQ::DONTWAIT.

Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.

  1. The message could not be dequeued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when recv returns an error code.

# File lib/ffi-rzmq/socket.rb, line 340
def recv_string string, flags = 0
  message = @receiver_klass.new
  rc = recvmsg message, flags
  string.replace(message.copy_out_string) if Util.resultcode_ok?(rc)
  message.close
  rc
end
recv_strings(list, flag = 0) click to toggle source

Receive a multipart message as a list of strings.

flag may be ZMQ::DONTWAIT. Any other flag will be removed.

# File lib/ffi-rzmq/socket.rb, line 353
def recv_strings list, flag = 0
  array = []
  rc = recvmsgs array, flag

  if Util.resultcode_ok?(rc)
    array.each do |message|
      list << message.copy_out_string
      message.close
    end
  end

  rc
end
recvmsg(message, flags = 0) click to toggle source

Dequeues a message from the underlying queue. By default, this is a blocking operation.

flags may take two values:

0 (default) - blocking operation
ZMQ::DONTWAIT - non-blocking operation

Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.

  1. The message could not be dequeued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when recv returns an error code.

# File lib/ffi-rzmq/socket.rb, line 319
def recvmsg message, flags = 0
  #LibZMQ.zmq_recvmsg @socket, message.address, flags
  __recvmsg__(@socket, message.address, flags)
end
recvmsgs(list, flag = 0) click to toggle source

Receive a multipart message as an array of objects (by default these are instances of Message).

flag may be ZMQ::DONTWAIT. Any other flag will be removed.

# File lib/ffi-rzmq/socket.rb, line 373
def recvmsgs list, flag = 0
  flag = DONTWAIT if dontwait?(flag)

  message = @receiver_klass.new
  rc = recvmsg message, flag

  if Util.resultcode_ok?(rc)
    list << message

    # check rc *first*; necessary because the call to #more_parts? can reset
    # the zmq_errno to a weird value, so the zmq_errno that was set on the
    # call to #recv gets lost
    while Util.resultcode_ok?(rc) && more_parts?
      message = @receiver_klass.new
      rc = recvmsg message, flag

      if Util.resultcode_ok?(rc)
        list << message
      else
        message.close
        list.each { |msg| msg.close }
        list.clear
      end
    end
  else
    message.close
  end

  rc
end
send_and_close(message, flags = 0) click to toggle source

Sends a message. This will automatically close the message for both successful and failed sends.

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

# File lib/ffi-rzmq/socket.rb, line 296
def send_and_close message, flags = 0
  rc = sendmsg message, flags
  message.close
  rc
end
send_string(string, flags = 0) click to toggle source

Helper method to make a new #Message instance out of the string passed in for transmission.

flags may be ZMQ::DONTWAIT and ZMQ::SNDMORE.

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

# File lib/ffi-rzmq/socket.rb, line 244
def send_string string, flags = 0
  message = Message.new string
  send_and_close message, flags
end
send_strings(parts, flags = 0) click to toggle source

Send a sequence of strings as a multipart message out of the parts passed in for transmission. Every element of parts should be a String.

flags may be ZMQ::DONTWAIT.

Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.

  1. A message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

# File lib/ffi-rzmq/socket.rb, line 263
def send_strings parts, flags = 0
  send_multiple(parts, flags, :send_string)
end
sendmsg(message, flags = 0) click to toggle source

Queues the message for transmission. Message is assumed to conform to the same public API as #Message.

flags may take two values:

  • 0 (default) - blocking operation

  • ZMQ::DONTWAIT - non-blocking operation

  • ZMQ::SNDMORE - this message is part of a multi-part message

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

# File lib/ffi-rzmq/socket.rb, line 227
def sendmsg message, flags = 0
  __sendmsg__(@socket, message.address, flags)
end
sendmsgs(parts, flags = 0) click to toggle source

Send a sequence of messages as a multipart message out of the parts passed in for transmission. Every element of parts should be a Message (or subclass).

flags may be ZMQ::DONTWAIT.

Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.

  1. A message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

# File lib/ffi-rzmq/socket.rb, line 281
def sendmsgs parts, flags = 0
  send_multiple(parts, flags, :sendmsg)
end
setsockopt(name, value, length = nil) click to toggle source

Set the queue options on this socket.

Valid name values that take a numeric value are:

ZMQ::HWM
ZMQ::SWAP (version 2 only)
ZMQ::AFFINITY
ZMQ::RATE
ZMQ::RECOVERY_IVL
ZMQ::MCAST_LOOP (version 2 only)
ZMQ::LINGER
ZMQ::RECONNECT_IVL
ZMQ::BACKLOG
ZMQ::RECOVER_IVL_MSEC (version 2 only)
ZMQ::RECONNECT_IVL_MAX (version 3 only)
ZMQ::MAXMSGSIZE (version 3 only)
ZMQ::SNDHWM (version 3 only)
ZMQ::RCVHWM (version 3 only)
ZMQ::MULTICAST_HOPS (version 3 only)
ZMQ::RCVTIMEO (version 3 only)
ZMQ::SNDTIMEO (version 3 only)

Valid name values that take a string value are:

ZMQ::IDENTITY (version 2/3 only)
ZMQ::SUBSCRIBE
ZMQ::UNSUBSCRIBE

Returns 0 when the operation completed successfully. Returns -1 when this operation failed.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

rc = socket.setsockopt(ZMQ::LINGER, 1_000)
ZMQ::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")
# File lib/ffi-rzmq/socket.rb, line 126
def setsockopt name, value, length = nil
  if 1 == @option_lookup[name]
    length = 8
    pointer = LibC.malloc length
    pointer.write_long_long value

  elsif 0 == @option_lookup[name]
    length = 4
    pointer = LibC.malloc length
    pointer.write_int value

  elsif 2 == @option_lookup[name]
    # Strings are treated as pointers by FFI so we'll just pass it through
    length ||= value.size
    pointer = value

  end

  rc = LibZMQ.zmq_setsockopt @socket, name, pointer, length
  LibC.free(pointer) unless pointer.is_a?(String) || pointer.nil? || pointer.null?
  rc
end
unbind(endpoint) click to toggle source

Unbind the socket from the given endpoint.

# File lib/ffi-rzmq/socket.rb, line 496
def unbind(endpoint)
  LibZMQ.zmq_unbind(socket, endpoint)
end

Private Instance Methods

__getsockopt__(name, array) click to toggle source
# File lib/ffi-rzmq/socket.rb, line 519
def __getsockopt__ name, array
  # a small optimization so we only have to determine the option
  # type a single time; gives approx 5% speedup to do it this way.
  option_type = @option_lookup[name]

  value, length = sockopt_buffers option_type

  rc = LibZMQ.zmq_getsockopt @socket, name, value, length

  if Util.resultcode_ok?(rc)
    array[0] = if 1 == option_type
      value.read_long_long
    elsif 0 == option_type
      value.read_int
    elsif 2 == option_type
      value.read_string(length.read_int)
    end
  end

  rc
end
__recvmsg__(socket, address, flags) click to toggle source
# File lib/ffi-rzmq/socket.rb, line 583
def __recvmsg__(socket, address, flags)
  LibZMQ.zmq_recvmsg(socket, address, flags)
end
__sendmsg__(socket, address, flags) click to toggle source
# File lib/ffi-rzmq/socket.rb, line 579
def __sendmsg__(socket, address, flags)
  LibZMQ.zmq_sendmsg(socket, address, flags)
end
alloc_pointer(kind, length) click to toggle source
# File lib/ffi-rzmq/socket.rb, line 573
def alloc_pointer(kind, length)
  pointer = FFI::MemoryPointer.new :size_t
  pointer.write_int(length)
  [FFI::MemoryPointer.new(kind), pointer]
end
define_finalizer() click to toggle source

these finalizer-related methods cannot live in the CommonSocketBehavior module; they must be in the class definition directly

# File lib/ffi-rzmq/socket.rb, line 598
def define_finalizer
  ObjectSpace.define_finalizer(self, self.class.close(@socket, Process.pid))
end
dontwait?(flags) click to toggle source
# File lib/ffi-rzmq/socket.rb, line 568
def dontwait?(flags)
  (DONTWAIT & flags) == DONTWAIT
end
Also aliased as: noblock?
noblock?(flags)
Alias for: dontwait?
populate_option_lookup() click to toggle source
# File lib/ffi-rzmq/socket.rb, line 587
def populate_option_lookup
  IntegerSocketOptions.each { |option| @option_lookup[option] = 0 }

  LongLongSocketOptions.each { |option| @option_lookup[option] = 1 }

  StringSocketOptions.each { |option| @option_lookup[option] = 2 }
end
release_cache() click to toggle source
# File lib/ffi-rzmq/socket.rb, line 563
def release_cache
  @longlong_cache = nil
  @int_cache = nil
end
remove_finalizer() click to toggle source
# File lib/ffi-rzmq/socket.rb, line 602
def remove_finalizer
  ObjectSpace.undefine_finalizer self
end
send_multiple(parts, flags, method_name) click to toggle source
# File lib/ffi-rzmq/socket.rb, line 503
def send_multiple(parts, flags, method_name)
  if !parts || parts.empty?
    -1
  else
    flags = DONTWAIT if dontwait?(flags)
    rc = 0

    parts[0..-2].each do |part|
      rc = send(method_name, part, (flags | ZMQ::SNDMORE))
      break unless Util.resultcode_ok?(rc)
    end

    Util.resultcode_ok?(rc) ? send(method_name, parts[-1], flags) : rc
  end
end
sockopt_buffers(option_type) click to toggle source

Calls to ZMQ.getsockopt require us to pass in some pointers. We can cache and save those buffers for subsequent calls. This is a big perf win for calling RCVMORE which happens quite often. Cannot save the buffer for the IDENTITY.

# File lib/ffi-rzmq/socket.rb, line 544
def sockopt_buffers option_type
  if 1 == option_type
    # int64_t or uint64_t
    @longlong_cache ||= alloc_pointer(:int64, 8)

  elsif 0 == option_type
    # int, 0mq assumes int is 4-bytes
    @int_cache ||= alloc_pointer(:int32, 4)

  elsif 2 == option_type
    # could be a string of up to 255 bytes, so allocate for worst case
    alloc_pointer(255, 255)

  else
    # uh oh, someone passed in an unknown option; return nil
    @int_cache ||= alloc_pointer(:int32, 4)
  end
end