class Backports::Ractor

Constants

RACTOR_STATE
RactorThreadGroups

Attributes

main[R]
ractor_incoming_queue[R]

@api private

ractor_outgoing_queue[R]

@api private

ractor_thread[R]

@api private

Public Class Methods

count() click to toggle source
# File lib/backports/ractor/ractor.rb, line 226
def count
  ::ObjectSpace.each_object(Ractor).count(&:ractor_live?)
end
current() click to toggle source
# File lib/backports/ractor/ractor.rb, line 221
def current
  ::Thread.current.thread_variable_get(:backports_ractor) ||
    ::Thread.current.thread_variable_set(:backports_ractor, ractor_find_current)
end
make_shareable(obj) click to toggle source
# File lib/backports/ractor/ractor.rb, line 211
def make_shareable(obj)
  return obj if ractor_check_shareability?(obj, true)

  raise Ractor::Error, '#freeze does not freeze object correctly'
end
new(*args, &block) click to toggle source

Implementation notes

Uses one `Thread` for each `Ractor`, as well as queues for communication

The incoming queue is strict: contrary to standard queue, you can't pop from an empty closed queue. Since standard queues return `nil` is those conditions, we wrap/unwrap `nil` values and consider all `nil` values to be results of closed queues. `ClosedQueueError` are re-raised as `Ractor::ClosedError`

The outgoing queue is strict and blocking. Same wrapping / raising as incoming, with an extra queue to acknowledge when a value has been read (or if the port is closed while waiting).

The last result is a bit tricky as it needs to be pushed on the outgoing queue but can not be blocking. For this, we “soft close” the outgoing port.

# File lib/backports/ractor/ractor.rb, line 30
def initialize(*args, &block)
  @ractor_incoming_queue = IncomingQueue.new
  @ractor_outgoing_queue = OutgoingQueue.new
  raise ::ArgumentError, 'must be called with a block' unless block

  kw = args.last
  if kw.is_a?(::Hash) && kw.size == 1 && kw.key?(:name)
    args.pop
    name = kw[:name]
  end
  @ractor_name = name && Backports.coerce_to_str(name)

  @id = Ractor.ractor_next_id
  if Ractor.main == nil # then initializing main Ractor
    @ractor_thread = ::Thread.current
    @ractor_origin = nil
    @ractor_thread.thread_variable_set(:backports_ractor, self)
  else
    @ractor_origin = caller(1, 1).first.split(':in `').first

    args.map! { |a| Ractor.ractor_isolate(a, false) }
    ractor_thread_start(args, block)
  end
end
ractor_isolate(val, move = false) click to toggle source

@api private

# File lib/backports/ractor/sharing.rb, line 7
def ractor_isolate(val, move = false)
  return val if move

  Cloner.deep_clone(val)
end
ractor_mark_set_shareable(visited) click to toggle source
# File lib/backports/ractor/sharing.rb, line 49
def ractor_mark_set_shareable(visited)
  visited.each do |key|
    @ractor_shareable[key] = Ractor
  end
end
ractor_next_id() click to toggle source

@api private

# File lib/backports/ractor/ractor.rb, line 243
def ractor_next_id
  @id ||= 0
  @id += 1
end
ractor_reset() click to toggle source

@api private

# File lib/backports/ractor/ractor.rb, line 231
def ractor_reset
  ::ObjectSpace.each_object(Ractor).each do |r|
    next if r == Ractor.current
    next unless (th = r.ractor_thread)

    th.kill
    th.join
  end
  Ractor.current.ractor_incoming_queue.clear
end
ractor_shareable_self?(obj, freeze_all) { || ... } click to toggle source

yield if shareability can't be determined without looking at its parts

# File lib/backports/ractor/sharing.rb, line 26
def ractor_shareable_self?(obj, freeze_all)
  return true if @ractor_shareable.key?(obj)
  return true if ractor_shareable_by_nature?(obj, freeze_all)
  if obj.frozen? || (freeze_all && obj.freeze)
    yield
  else
    false
  end
end
receive() click to toggle source
# File lib/backports/ractor/ractor.rb, line 171
def receive
  current.__send__(:receive)
end
Also aliased as: recv
receive_if(&block) click to toggle source
# File lib/backports/ractor/ractor.rb, line 176
def receive_if(&block)
  current.__send__(:receive_if, &block)
end
recv()
Alias for: receive
select(*ractors, yield_value: not_given = true, move: false) click to toggle source
# File lib/backports/ractor/ractor.rb, line 180
def select(*ractors, yield_value: not_given = true, move: false)
  cur = Ractor.current
  queues = ractors.map do |r|
    r == cur ? r.ractor_incoming_queue : r.ractor_outgoing_queue
  end
  if !not_given
    out = current.ractor_outgoing_queue
    yield_value = ractor_isolate(yield_value, move)
  elsif ractors.empty?
    raise ::ArgumentError, 'specify at least one ractor or `yield_value`'
  end

  while true # rubocop:disable Style/InfiniteLoop
              # Don't `loop`, in case of `ClosedError` (not that there should be any)
    queues.each_with_index do |q, i|
      q.pop_non_blocking do |val|
        r = ractors[i]
        return [r == cur ? :receive : r, val]
      end
    end

    if out && out.num_waiting > 0
      # Not quite atomic...
      out.push(yield_value, ack: true)
      return [:yield, nil]
    end

    sleep(0.001)
  end
end
shareable?(obj) click to toggle source
# File lib/backports/ractor/ractor.rb, line 217
def shareable?(obj)
  ractor_check_shareability?(obj, false)
end
yield(value, move: false) click to toggle source
# File lib/backports/ractor/ractor.rb, line 164
def yield(value, move: false)
  value = ractor_isolate(value, move)
  current.ractor_outgoing_queue.push(value, ack: true)
rescue ::ClosedQueueError
  raise ClosedError, 'The outgoing-port is already closed'
end

Private Class Methods

ractor_check_shareability?(obj, freeze_all) click to toggle source
# File lib/backports/ractor/sharing.rb, line 13
        def ractor_check_shareability?(obj, freeze_all)
  ractor_shareable_self?(obj, freeze_all) do
    visited = {}

    return false unless ractor_shareable_parts?(obj, freeze_all, visited)

    ractor_mark_set_shareable(visited)

    true
  end
end
ractor_find_current() click to toggle source
# File lib/backports/ractor/ractor.rb, line 256
        def ractor_find_current
  RactorThreadGroups[Thread.current.group]
end
ractor_init() click to toggle source
# File lib/backports/ractor/ractor.rb, line 250
        def ractor_init
  @ractor_shareable = ::ObjectSpace::WeakMap.new
  @main = Ractor.new { nil }
  RactorThreadGroups[::ThreadGroup::Default] = @main
end
ractor_shareable_by_nature?(obj, freeze_all) click to toggle source
# File lib/backports/ractor/sharing.rb, line 81
        def ractor_shareable_by_nature?(obj, freeze_all)
  case obj
  when ::Module, Ractor
    true
  when ::Regexp, ::Range, ::Numeric
    !freeze_all # Assume that these are literals that would have been frozen in 3.0
                # unless we're making them shareable, in which case we might as well
                # freeze them for real.
  when ::Symbol, false, true, nil # Were only frozen in Ruby 2.3+
    true
  else
    false
  end
end
ractor_shareable_parts?(obj, freeze_all, visited) click to toggle source
# File lib/backports/ractor/sharing.rb, line 36
        def ractor_shareable_parts?(obj, freeze_all, visited)
  return true if visited.key?(obj)
  visited[obj] = true

  ractor_traverse(obj) do |part|
    return false unless ractor_shareable_self?(part, freeze_all) do
      ractor_shareable_parts?(part, freeze_all, visited)
    end
  end

  true
end
ractor_traverse(obj) { |default_proc| ... } click to toggle source
# File lib/backports/ractor/sharing.rb, line 55
        def ractor_traverse(obj, &block)
  case obj
  when ::Hash
    Hash obj.default
    yield obj.default_proc
    obj.each do |key, value|
      yield key
      yield value
    end
  when ::Range
    yield obj.begin
    yield obj.end
  when ::Array, ::Struct
    obj.each(&block)
  when ::Complex
    yield obj.real
    yield obj.imaginary
  when ::Rational
    yield obj.numerator
    yield obj.denominator
  end
  obj.instance_variables.each do |var|
    yield obj.instance_variable_get(var)
  end
end

Public Instance Methods

<<(obj, move: false)
Alias for: send
[](key) click to toggle source
# File lib/backports/ractor/ractor.rb, line 150
def [](key)
  Ractor.current.ractor_locals[key]
end
[]=(key, value) click to toggle source
# File lib/backports/ractor/ractor.rb, line 154
def []=(key, value)
  Ractor.current.ractor_locals[key] = value
end
close_incoming() click to toggle source
# File lib/backports/ractor/ractor.rb, line 129
def close_incoming
  r = ractor_incoming_queue.closed?
  ractor_incoming_queue.close
  r
end
close_outgoing() click to toggle source
# File lib/backports/ractor/ractor.rb, line 135
def close_outgoing
  r = ractor_outgoing_queue.closed?
  ractor_outgoing_queue.close
  r
end
inspect() click to toggle source
# File lib/backports/ractor/ractor.rb, line 117
def inspect
  state = RACTOR_STATE[@ractor_thread ? @ractor_thread.status : 'run']
  info = [
    "Ractor:##{@id}",
    name,
    @ractor_origin,
    state,
  ].compact.join(' ')

  "#<#{info}>"
end
name() click to toggle source
# File lib/backports/ractor/ractor.rb, line 104
def name
  @ractor_name
end
ractor_live?() click to toggle source

@api private

# File lib/backports/ractor/ractor.rb, line 262
def ractor_live?
  !defined?(@ractor_thread) || # May happen if `count` is called from another thread before `initialize` has completed
    @ractor_thread.status
end
ractor_locals() click to toggle source

@api private

# File lib/backports/ractor/ractor.rb, line 159
def ractor_locals
  @ractor_locals ||= {}.compare_by_identity
end
send(obj, move: false) click to toggle source
# File lib/backports/ractor/ractor.rb, line 92
def send(obj, move: false)
  ractor_incoming_queue << Ractor.ractor_isolate(obj, move)
  self
rescue ::ClosedQueueError
  raise ClosedError, 'The incoming-port is already closed'
end
Also aliased as: <<
take() click to toggle source
# File lib/backports/ractor/ractor.rb, line 100
def take
  ractor_outgoing_queue.pop(ack: true)
end

Private Instance Methods

ractor_thread_start(args, block) click to toggle source
# File lib/backports/ractor/ractor.rb, line 55
        def ractor_thread_start(args, block)
  ::Thread.new do
    @ractor_thread = ::Thread.current
    @ractor_thread_group = ::ThreadGroup.new
    RactorThreadGroups[@ractor_thread_group] = self
    @ractor_thread_group.add(@ractor_thread)
    ::Thread.current.thread_variable_set(:backports_ractor, self)
    result = nil
    begin
      result = instance_exec(*args, &block)
    rescue ::Exception => err # rubocop:disable Lint/RescueException
      begin
        raise RemoteError, "thrown by remote Ractor: #{err.message}"
      rescue RemoteError => e # Hack to create exception with `cause`
        result = OutgoingQueue::WrappedException.new(e)
      end
    ensure
      ractor_thread_terminate(result)
    end
  end
end
ractor_thread_terminate(result) click to toggle source
# File lib/backports/ractor/ractor.rb, line 77
        def ractor_thread_terminate(result)
  begin
    ractor_outgoing_queue.push(result, ack: false) unless ractor_outgoing_queue.closed?
  rescue ::ClosedQueueError
    return # ignore
  end
  ractor_incoming_queue.close
  ractor_outgoing_queue.close(:soft)
ensure
  # TODO: synchronize?
  @ractor_thread_group.list.each do |thread|
    thread.kill unless thread == Thread.current
  end
end
receive() click to toggle source
# File lib/backports/ractor/ractor.rb, line 141
        def receive
  ractor_incoming_queue.pop
end
receive_if(&block) click to toggle source
# File lib/backports/ractor/ractor.rb, line 145
        def receive_if(&block)
  raise ::ArgumentError, 'no block given' unless block
  ractor_incoming_queue.pop(&block)
end