class CZTop::Actor

Represents a CZMQ::FFI::Zactor.

About Thread-Safety

The instance methods of this class are thread-safe. So it's safe to call {#<<}, {#request} or even {#terminate} from different threads. Caution: Use only these methods to communicate with the low-level zactor. Don't use {Message#send_to} directly to send itself to an {Actor} instance, as it wouldn't be thread-safe.

About termination

Actors should be terminated explicitly, either by calling {#terminate} from the current process or sending them the “$TERM” command (from outside). Not terminating them explicitly might make the process block at exit.

@example Simple Actor with Ruby block

result = ""
a = CZTop::Actor.new do |msg, pipe|
  case msg[0]
  when "foo"
    pipe << "bar"
  when "append"
    result << msg[1].to_s
  when "result"
    pipe << result
  end
end
a.request("foo")[0] #=> "bar"
a.request("foo")[0] #=> "bar"
a << ["append", "baz"] << ["append", "baz"]
a.request("result")[0] #=> "bazbaz"

@see api.zeromq.org/czmq3-0:zactor

Constants

TERM

the command which causes an actor handler to terminate

Attributes

exception[R]

@return [Exception] the exception that crashed this actor, if any

Public Class Methods

new(callback = nil, c_args = nil, &handler) click to toggle source

Creates a new actor. Either pass a callback directly or a block. The block will be called for every received message.

In case the given callback is an FFI::Pointer (to a C function), it's used as-is. It is expected to do the handshake (signal) itself.

@param callback [FFI::Pointer, Proc, call] pointer to a C function or

just anything callable

@param c_args [FFI::Pointer, nil] args, only useful if callback is an

FFI::Pointer

@yieldparam message [Message] @yieldparam pipe [Socket::PAIR] @see process_messages

# File lib/cztop/actor.rb, line 62
def initialize(callback = nil, c_args = nil, &handler)
  @running = true
  @mtx = Mutex.new
  @callback = callback || handler
  @callback = shim(@callback) unless @callback.is_a? ::FFI::Pointer
  ffi_delegate = Zactor.new(@callback, c_args)
  attach_ffi_delegate(ffi_delegate)
  options.sndtimeo = 20#ms # see #<<
end

Public Instance Methods

<<(message) click to toggle source

Send a message to the actor. @param message [Object] message to send to the actor, see {Message.coerce} @return [self] so it's chainable @raise [DeadActorError] if actor is terminated @raise [IO::EAGAINWaitWritable, RuntimeError] anything that could be

raised by {Message#send_to}

@note Normally this method is asynchronous, but if the message is

"$TERM", it blocks until the actor is terminated.
# File lib/cztop/actor.rb, line 80
def <<(message)
  message = Message.coerce(message)

  if TERM == message[0]
    # NOTE: can't just send this to the actor. The sender might call
    # #terminate immediately, which most likely causes a hang due to race
    # conditions.
    terminate
  else
    begin
      @mtx.synchronize do
        raise DeadActorError if not @running
        message.send_to(self)
      end
    rescue IO::EAGAINWaitWritable
      # The sndtimeo has been reached.
      #
      # This should fix the race condition (mainly on JRuby) between
      # @running not being set to false yet but the actor handler already
      # terminating and thus not able to receive messages anymore.
      #
      # This shouldn't result in an infinite loop, since it'll stop as
      # soon as @running is set to false by #signal_shimmed_handler_death,
      # at least when using a Ruby handler.
      #
      # In case of a C function handler, it MUST NOT crash and only
      # terminate when being sent the "$TERM" message using #terminate (so
      # #await_handler_death can set
      # @running to false).
      retry
    end
  end
  self
end
crashed?() click to toggle source

@return [Boolean] whether this actor has crashed @see exception

# File lib/cztop/actor.rb, line 189
def crashed?
  !!@exception # if set, it has crashed
end
dead?() click to toggle source

@return [Boolean] whether this actor is dead (terminated or crashed)

# File lib/cztop/actor.rb, line 183
def dead?
  !@running
end
receive() click to toggle source

Receive a message from the actor. @return [Message] @raise [DeadActorError] if actor is terminated

Calls superclass method CZTop::SendReceiveMethods#receive
# File lib/cztop/actor.rb, line 118
def receive
  @mtx.synchronize do
    raise DeadActorError if not @running
    super
  end
end
request(message) click to toggle source

Same as {#<<}, but also waits for a response from the actor and returns it. @param message [Message] the request to the actor @return [Message] the actor's response @raise [ArgumentError] if the message is “$TERM” (use {#terminate})

# File lib/cztop/actor.rb, line 130
def request(message)
  @mtx.synchronize do
    raise DeadActorError if not @running
    message = Message.coerce(message)
    raise ArgumentError, "use #terminate" if TERM == message[0]
    message.send_to(self)
    Message.receive_from(self)
  end
rescue IO::EAGAINWaitWritable
  # same as in #<<
  retry
end
send_picture(picture, *args) click to toggle source

Sends a message according to a “picture”. @see zsock_send() on api.zeromq.org/czmq3-0:zsock @note Mainly added for {Beacon}. If implemented there, it wouldn't be

thread safe. And it's not that useful to be added to
{SendReceiveMethods}.

@param picture [String] message's part types @param args [String, Integer, …] values, in FFI style (each one

preceeded with it's type, like <tt>:string, "foo"</tt>)

@return [void]

# File lib/cztop/actor.rb, line 152
def send_picture(picture, *args)
  @mtx.synchronize do
    raise DeadActorError if not @running
    Zsock.send(ffi_delegate, picture, *args)
  end
end
terminate() click to toggle source

Tells the actor to terminate and waits for it. Idempotent. @return [Boolean] whether it died just now (false if it was dead

already)
# File lib/cztop/actor.rb, line 170
def terminate
  @mtx.synchronize do
    return false if not @running
    Message.new(TERM).send_to(self)
    await_handler_death
    true
  end
rescue IO::EAGAINWaitWritable
  # same as in #<<
  retry
end
wait() click to toggle source

Thread-safe {PolymorphicZsockMethods#wait}. @return [Integer]

Calls superclass method CZTop::PolymorphicZsockMethods#wait
# File lib/cztop/actor.rb, line 161
def wait
  @mtx.synchronize do
    super
  end
end

Private Instance Methods

await_handler_death() click to toggle source

Waits for the C or Ruby handler to die. @return [void]

# File lib/cztop/actor.rb, line 291
def await_handler_death
  if handler_shimmed?
    # for Ruby block/Proc object handlers
    @handler_dead_signal.pop

  else
    # for handlers that are passed as C functions, we rely on normal death
    # signal

    # can't use #wait here because of recursive deadlock
    Zsock.wait(ffi_delegate)

    @running = false
  end
end
handler_shimmed?() click to toggle source

@return [Boolean] whether the handler is a Ruby object, like a simple

block (as opposed to a FFI::Pointer to a C function)
# File lib/cztop/actor.rb, line 226
def handler_shimmed?
  !!@handler_thread # if it exists, it's shimmed
end
next_message() click to toggle source

Receives the next message even across any interrupts. @return [Message] the next message

# File lib/cztop/actor.rb, line 261
def next_message
  @pipe.receive
end
process_messages(handler) click to toggle source

Successively receive messages that were sent to the actor and yield them to the given handler to process them. The a pipe (a {Socket::PAIR} socket) is also passed to the handler so it can send back the result of a command, if needed.

When a message is “$TERM”, or when the waiting for a message is interrupted, execution is aborted and the actor will terminate.

@param handler [Proc, call] the handler used to process messages @yieldparam message [Message] message (e.g. command) received @yieldparam pipe [Socket::PAIR] pipe to write back something into the

actor
# File lib/cztop/actor.rb, line 245
def process_messages(handler)
  while true
    begin
      message = next_message
    rescue Interrupt
      break
    else
      break if TERM == message[0]
    end

    handler.call(message, @pipe)
  end
end
shim(handler) click to toggle source

Shims the given handler. The shim is used to do the handshake, to {#process_messages}, and ensure we're notified when the handler has terminated.

@param handler [Proc, call] the handler used to process messages @return [FFI::Function] the callback function to be passed to the zactor @raise [ArgumentError] if invalid handler given

# File lib/cztop/actor.rb, line 202
def shim(handler)
  raise ArgumentError, "invalid handler" if !handler.respond_to?(:call)

  @handler_thread = nil
  @handler_dead_signal = Queue.new # used for signaling

  Zactor.fn do |pipe_delegate, _args|
    begin
      @mtx.synchronize do
        @handler_thread = Thread.current
        @pipe = Socket::PAIR.from_ffi_delegate(pipe_delegate)
        @pipe.signal # handshake, so zactor_new() returns
      end
      process_messages(handler)
    rescue Exception
      @exception = $!
    ensure
      signal_shimmed_handler_death
    end
  end
end
signal_shimmed_handler_death() click to toggle source

Creates a new thread that will signal the definitive termination of the Ruby handler.

This is needed to avoid the race condition between zactor_destroy() which will wait for a signal from the handler in case it was able to send the “$TERM” command, and the @callback which might still haven't returned, but doesn't receive any messages anymore.

@return [void]

# File lib/cztop/actor.rb, line 274
def signal_shimmed_handler_death
  # NOTE: can't just use ConditionVariable, as the signaling code might be
  # run BEFORE the waiting code.

  Thread.new do
    @handler_thread.join

    # NOTE: we do this here and not in #terminate, so it also works when
    # actor isn't terminated using #terminate
    @running = false

    @handler_dead_signal.push(nil)
  end
end