class Async::ResourcePool::Simple

Public Class Methods

new(limit, wakeup_strategy = :immediately) click to toggle source

@param limit [Integer] - max number of acquired resources, must be greater then 0. @param wakeup_strategy [Symbol] - can be :immediately or :next_loop.

# File lib/async/resource_pool/simple.rb, line 10
def initialize(limit, wakeup_strategy = :immediately)
  raise ArgumentError, 'limit must be greater than 0' if limit <= 0
  unless [:immediately, :next_loop].include?(wakeup_strategy)
    raise ArgumentError, 'wakeup_strategy must be :immediately or :next_loop'
  end

  @limit = limit
  @wakeup_strategy = wakeup_strategy
  @owners = []
  @waiters = []
  @thread_mutex = Mutex.new
end

Public Instance Methods

acquire(timeout = nil) { || ... } click to toggle source

Acquires resource for current fiber if available otherwise yield to reactor. Will be resumed once resource available. Will raise Async::ResourcePool::TimeoutError if timeout not nil and resource isn't available after timeout. Will raise Async::ResourcePool::AlreadyOwnError if resource already acquired. @param timeout [Integer|Float] - timeout in seconds (default nil).

# File lib/async/resource_pool/simple.rb, line 28
def acquire(timeout = nil)
  raise Async::ResourcePool::AlreadyOwnError.new if already_acquired?

  unless acquire_if_available
    timeout.nil? ? Async::Task.yield : wait_with_timeout(timeout)
    @thread_mutex.synchronize { @owners.push(Fiber.current) }
  end

  if block_given?
    begin
      yield
    ensure
      release
    end
  end

  nil
end
already_acquired?() click to toggle source

@return [True|False] returns true if resource already acquired by fiber

# File lib/async/resource_pool/simple.rb, line 72
def already_acquired?
  @owners.include?(Fiber.current)
end
can_be_acquired?() click to toggle source

@return [True|False] returns true if pool has available resource

# File lib/async/resource_pool/simple.rb, line 77
def can_be_acquired?
  @owners.size < @limit
end
info() click to toggle source

@return [Hash] represents current state of resource pool.

waiters - how many fibers waits for resource.
owners - how many fibers own resource.
limit - maximum of resources that can be owned simultaneously.
# File lib/async/resource_pool/simple.rb, line 85
def info
  {
      waiters: @waiters.size,
      owners: @owners.size,
      limit: @limit
  }
end
release() click to toggle source

Releases resource for current fiber. Will resume first fiber that waits for resource immediately if wakeup_strategy == :immediately Will resume first fiber that waits for resource in next reactor loop if wakeup_strategy == :next_loop Will raise Async::ResourcePool::DoesNotOwnError if fiber does not own resource.

# File lib/async/resource_pool/simple.rb, line 65
def release
  raise Async::ResourcePool::DoesNotOwnError.new unless already_acquired?
  @owners.delete(Fiber.current)
  wakeup
end
try_acquire() click to toggle source

Acquires resource for current fiber if resource available. Will raise Async::ResourcePool::AlreadyOwnError if resource already acquired. @return [True|False] returns true if resource was acquired.

# File lib/async/resource_pool/simple.rb, line 50
def try_acquire
  raise Async::ResourcePool::AlreadyOwnError.new if already_acquired?

  if acquire_if_available
    true
  else
    @waiters.delete(Fiber.current)
    false
  end
end

Private Instance Methods

acquire_if_available() click to toggle source
# File lib/async/resource_pool/simple.rb, line 115
def acquire_if_available
  fiber = Fiber.current

  @thread_mutex.synchronize do
    if can_be_acquired?
      @owners.push(fiber)
      true
    else
      @waiters.push(fiber)
      false
    end
  end
end
wait_with_timeout(timeout) click to toggle source
# File lib/async/resource_pool/simple.rb, line 129
def wait_with_timeout(timeout)
  fiber = Fiber.current

  Async::Task.current.with_timeout(timeout) do |timer|
    begin
      Async::Task.yield
      timer.cancel
    rescue Async::TimeoutError => _
      @waiters.delete(fiber)
      raise Async::ResourcePool::TimeoutError.new(timeout)
    end
  end
end
wakeup() click to toggle source
# File lib/async/resource_pool/simple.rb, line 95
def wakeup
  return if @waiters.empty?
  fiber = @waiters.shift
  return unless fiber.alive?

  if @wakeup_strategy == :immediately
    fiber.resume
  else
    Async::Task.current.reactor << fiber
  end
end
wakeup_fiber(fiber) click to toggle source
# File lib/async/resource_pool/simple.rb, line 107
def wakeup_fiber(fiber)
  if @wakeup_strategy == :immediately
    fiber.resume
  else
    Async::Task.current.reactor << fiber
  end
end