class Moneta::Pool::PoolManager
@api private
Public Class Methods
new(builder, min: 0, max: nil, ttl: nil, timeout: nil)
click to toggle source
# File lib/moneta/pool.rb, line 73 def initialize(builder, min: 0, max: nil, ttl: nil, timeout: nil) @builder = builder @min = min @max = max @ttl = ttl @timeout = timeout @inbox = [] @mutex = ::Mutex.new @resource = ::ConditionVariable.new @stores = Set.new @available = [] @waiting = [] @waiting_since = [] if @timeout @last_checkout = nil @stopping = false @idle_time = nil # Launch the manager thread @thread = run end
Public Instance Methods
check_in(store)
click to toggle source
# File lib/moneta/pool.rb, line 118 def check_in(store) push(:check_in, store) end
check_out()
click to toggle source
# File lib/moneta/pool.rb, line 112 def check_out reply = push(:check_out, reply: true) raise reply if Exception === reply reply end
kill!()
click to toggle source
# File lib/moneta/pool.rb, line 107 def kill! @thread.kill nil end
stats()
click to toggle source
# File lib/moneta/pool.rb, line 96 def stats push(:stats, reply: true) end
stop()
click to toggle source
# File lib/moneta/pool.rb, line 100 def stop push(:stop) nil ensure @thread.value end
Private Instance Methods
add_store()
click to toggle source
# File lib/moneta/pool.rb, line 226 def add_store store = @builder.build.last @stores.add(store) store end
handle_check_in(store)
click to toggle source
# File lib/moneta/pool.rb, line 263 def handle_check_in(store) if !@waiting.empty? @waiting.shift.resolve(store) @waiting_since.shift if @timeout else @available.push(store) end end
handle_check_out(reply)
click to toggle source
# File lib/moneta/pool.rb, line 232 def handle_check_out(reply) @last_checkout = Time.now if @stopping reply.resolve(ShutdownError.new("Shutting down")) elsif !@available.empty? reply.resolve(@available.pop) elsif !stores_maxed? begin reply.resolve(add_store) rescue => e reply.resolve(e) end else @waiting.push(reply) @waiting_since.push(Time.now) if @timeout end end
handle_request(request)
click to toggle source
# File lib/moneta/pool.rb, line 282 def handle_request(request) cmd, what, reply = request case cmd when :check_out handle_check_out(reply) when :check_in # A checkin request handle_check_in(what) when :stats handle_stats(reply) when :stop # Graceful exit handle_stop end end
handle_stats(reply)
click to toggle source
# File lib/moneta/pool.rb, line 272 def handle_stats(reply) reply.resolve(stores: @stores.length, available: @available.length, waiting: @waiting.length, longest_wait: @timeout && !@waiting_since.empty? ? @waiting_since.first.dup : nil, stopping: @stopping, last_checkout: @last_checkout && @last_checkout.dup, idle_time: @idle_time.dup) end
handle_stop()
click to toggle source
# File lib/moneta/pool.rb, line 250 def handle_stop @stopping = true # Reject anyone left waiting reject_waiting "Shutting down" end
handle_timed_out_requests()
click to toggle source
If there are checkout requests that have been waiting too long, feed them timeout errors.
# File lib/moneta/pool.rb, line 167 def handle_timed_out_requests while @timeout && !@waiting.empty? && (Time.now - @waiting_since.first) >= @timeout waiting_since = @waiting_since.shift @waiting.shift.resolve(TimeoutError.new("Waited %<secs>f seconds" % { secs: Time.now - waiting_since })) end end
pop()
click to toggle source
# File lib/moneta/pool.rb, line 219 def pop @mutex.synchronize do @resource.wait(@mutex, timeout) if @inbox.empty? @inbox.shift end end
populate_stores()
click to toggle source
# File lib/moneta/pool.rb, line 151 def populate_stores return if @stopping @available.push(add_store) while @stores.length < @min end
push(message, what = nil, reply: nil)
click to toggle source
This is called from outside the loop thread
# File lib/moneta/pool.rb, line 175 def push(message, what = nil, reply: nil) @mutex.synchronize do raise ShutdownError, "Pool has been shutdown" if reply && !@thread.alive? reply &&= Reply.new(@mutex) @inbox.push([message, what, reply]) @resource.signal reply.wait if reply end end
reject_waiting(reason)
click to toggle source
# File lib/moneta/pool.rb, line 256 def reject_waiting(reason) while reply = @waiting.shift reply.resolve(ShutdownError.new(reason)) end @waiting_since = [] if @timeout end
remove_unneeded_stores()
click to toggle source
If the last checkout was more than timeout ago, drop any available stores
# File lib/moneta/pool.rb, line 157 def remove_unneeded_stores return unless @stopping || (@ttl && @last_checkout && Time.now - @last_checkout >= @ttl) while (@stopping || @stores.length > @min) and store = @available.pop store.close rescue nil @stores.delete(store) end end
run()
click to toggle source
# File lib/moneta/pool.rb, line 124 def run Thread.new do populate_stores until @stopping && @stores.empty? loop_start = Time.now # Block until a message arrives, or until we time out for some reason request = pop # Record how long we were idle, for stats purposes @idle_time = Time.now - loop_start # If a message arrived, handle it handle_request(request) if request # Handle any stale checkout requests handle_timed_out_requests # Drop any stores that are no longer needed remove_unneeded_stores end rescue => e reject_waiting(e.message) raise end end
stores_available?()
click to toggle source
# File lib/moneta/pool.rb, line 207 def stores_available? !@available.empty? end
stores_maxed?()
click to toggle source
# File lib/moneta/pool.rb, line 215 def stores_maxed? @max != nil && @stores.length == @max end
stores_unneeded?()
click to toggle source
# File lib/moneta/pool.rb, line 211 def stores_unneeded? @stores.length > @min end
timeout()
click to toggle source
This method calculates the number of seconds to wait for a signal on the condition variable, or ‘nil` if there is no need to time out.
Calculated based on the ‘:ttl` and `:timeout` options used during construction.
@return [Integer, nil]
# File lib/moneta/pool.rb, line 192 def timeout # Time to wait before there will be stores that should be closed ttl = if @ttl && @last_checkout && stores_available? && stores_unneeded? [@ttl - (Time.now - @last_checkout), 0].max end # Time to wait timeout = if @timeout && !@waiting_since.empty? longest_waiting = @waiting_since.first [@timeout - (Time.now - longest_waiting), 0].max end [ttl, timeout].compact.min end