class Moneta::Pool::PoolManager

@api private

Public Class Methods

new(builder, min: 0, max: nil, ttl: nil, timeout: nil) click to toggle source
# File lib/moneta/pool.rb, line 73
def initialize(builder, min: 0, max: nil, ttl: nil, timeout: nil)
  @builder = builder
  @min = min
  @max = max
  @ttl = ttl
  @timeout = timeout

  @inbox = []
  @mutex = ::Mutex.new
  @resource = ::ConditionVariable.new

  @stores = Set.new
  @available = []
  @waiting = []
  @waiting_since = [] if @timeout
  @last_checkout = nil
  @stopping = false
  @idle_time = nil

  # Launch the manager thread
  @thread = run
end

Public Instance Methods

check_in(store) click to toggle source
# File lib/moneta/pool.rb, line 118
def check_in(store)
  push(:check_in, store)
end
check_out() click to toggle source
# File lib/moneta/pool.rb, line 112
def check_out
  reply = push(:check_out, reply: true)
  raise reply if Exception === reply
  reply
end
kill!() click to toggle source
# File lib/moneta/pool.rb, line 107
def kill!
  @thread.kill
  nil
end
stats() click to toggle source
# File lib/moneta/pool.rb, line 96
def stats
  push(:stats, reply: true)
end
stop() click to toggle source
# File lib/moneta/pool.rb, line 100
def stop
  push(:stop)
  nil
ensure
  @thread.value
end

Private Instance Methods

add_store() click to toggle source
# File lib/moneta/pool.rb, line 226
def add_store
  store = @builder.build.last
  @stores.add(store)
  store
end
handle_check_in(store) click to toggle source
# File lib/moneta/pool.rb, line 263
def handle_check_in(store)
  if !@waiting.empty?
    @waiting.shift.resolve(store)
    @waiting_since.shift if @timeout
  else
    @available.push(store)
  end
end
handle_check_out(reply) click to toggle source
# File lib/moneta/pool.rb, line 232
def handle_check_out(reply)
  @last_checkout = Time.now
  if @stopping
    reply.resolve(ShutdownError.new("Shutting down"))
  elsif !@available.empty?
    reply.resolve(@available.pop)
  elsif !stores_maxed?
    begin
      reply.resolve(add_store)
    rescue => e
      reply.resolve(e)
    end
  else
    @waiting.push(reply)
    @waiting_since.push(Time.now) if @timeout
  end
end
handle_request(request) click to toggle source
# File lib/moneta/pool.rb, line 282
def handle_request(request)
  cmd, what, reply = request
  case cmd
  when :check_out
    handle_check_out(reply)
  when :check_in
    # A checkin request
    handle_check_in(what)
  when :stats
    handle_stats(reply)
  when :stop
    # Graceful exit
    handle_stop
  end
end
handle_stats(reply) click to toggle source
# File lib/moneta/pool.rb, line 272
def handle_stats(reply)
  reply.resolve(stores: @stores.length,
                available: @available.length,
                waiting: @waiting.length,
                longest_wait: @timeout && !@waiting_since.empty? ? @waiting_since.first.dup : nil,
                stopping: @stopping,
                last_checkout: @last_checkout && @last_checkout.dup,
                idle_time: @idle_time.dup)
end
handle_stop() click to toggle source
# File lib/moneta/pool.rb, line 250
def handle_stop
  @stopping = true
  # Reject anyone left waiting
  reject_waiting "Shutting down"
end
handle_timed_out_requests() click to toggle source

If there are checkout requests that have been waiting too long, feed them timeout errors.

# File lib/moneta/pool.rb, line 167
def handle_timed_out_requests
  while @timeout && !@waiting.empty? && (Time.now - @waiting_since.first) >= @timeout
    waiting_since = @waiting_since.shift
    @waiting.shift.resolve(TimeoutError.new("Waited %<secs>f seconds" % { secs: Time.now - waiting_since }))
  end
end
pop() click to toggle source
# File lib/moneta/pool.rb, line 219
def pop
  @mutex.synchronize do
    @resource.wait(@mutex, timeout) if @inbox.empty?
    @inbox.shift
  end
end
populate_stores() click to toggle source
# File lib/moneta/pool.rb, line 151
def populate_stores
  return if @stopping
  @available.push(add_store) while @stores.length < @min
end
push(message, what = nil, reply: nil) click to toggle source

This is called from outside the loop thread

# File lib/moneta/pool.rb, line 175
def push(message, what = nil, reply: nil)
  @mutex.synchronize do
    raise ShutdownError, "Pool has been shutdown" if reply && !@thread.alive?
    reply &&= Reply.new(@mutex)
    @inbox.push([message, what, reply])
    @resource.signal
    reply.wait if reply
  end
end
reject_waiting(reason) click to toggle source
# File lib/moneta/pool.rb, line 256
def reject_waiting(reason)
  while reply = @waiting.shift
    reply.resolve(ShutdownError.new(reason))
  end
  @waiting_since = [] if @timeout
end
remove_unneeded_stores() click to toggle source

If the last checkout was more than timeout ago, drop any available stores

# File lib/moneta/pool.rb, line 157
def remove_unneeded_stores
  return unless @stopping || (@ttl && @last_checkout && Time.now - @last_checkout >= @ttl)
  while (@stopping || @stores.length > @min) and store = @available.pop
    store.close rescue nil
    @stores.delete(store)
  end
end
run() click to toggle source
# File lib/moneta/pool.rb, line 124
def run
  Thread.new do
    populate_stores

    until @stopping && @stores.empty?
      loop_start = Time.now

      # Block until a message arrives, or until we time out for some reason
      request = pop

      # Record how long we were idle, for stats purposes
      @idle_time = Time.now - loop_start

      # If a message arrived, handle it
      handle_request(request) if request

      # Handle any stale checkout requests
      handle_timed_out_requests
      # Drop any stores that are no longer needed
      remove_unneeded_stores
    end
  rescue => e
    reject_waiting(e.message)
    raise
  end
end
stores_available?() click to toggle source
# File lib/moneta/pool.rb, line 207
def stores_available?
  !@available.empty?
end
stores_maxed?() click to toggle source
# File lib/moneta/pool.rb, line 215
def stores_maxed?
  @max != nil && @stores.length == @max
end
stores_unneeded?() click to toggle source
# File lib/moneta/pool.rb, line 211
def stores_unneeded?
  @stores.length > @min
end
timeout() click to toggle source

This method calculates the number of seconds to wait for a signal on the condition variable, or ‘nil` if there is no need to time out.

Calculated based on the ‘:ttl` and `:timeout` options used during construction.

@return [Integer, nil]

# File lib/moneta/pool.rb, line 192
def timeout
  # Time to wait before there will be stores that should be closed
  ttl = if @ttl && @last_checkout && stores_available? && stores_unneeded?
          [@ttl - (Time.now - @last_checkout), 0].max
        end

  # Time to wait
  timeout = if @timeout && !@waiting_since.empty?
              longest_waiting = @waiting_since.first
              [@timeout - (Time.now - longest_waiting), 0].max
            end

  [ttl, timeout].compact.min
end