class ActiveRecord::Bogacs::ShareablePool

Constants

AtomicReference
DEBUG
DEFAULT_SHARED_POOL
MAX_THREAD_SHARING

Attributes

shared_size[R]

Public Class Methods

new(spec) click to toggle source

@override

Calls superclass method ActiveRecord::Bogacs::DefaultPool::new
# File lib/active_record/bogacs/shareable_pool.rb, line 29
def initialize(spec)
  super(spec)
  shared_size = spec.config[:shared_pool]
  shared_size = shared_size ? shared_size.to_f : DEFAULT_SHARED_POOL
  # size 0.0 - 1.0 assumes percentage of the pool size
  shared_size = ( @size * shared_size ).round if shared_size <= 1.0
  @shared_size = shared_size.to_i
  @shared_connections = ThreadSafe::Map.new # initial_capacity: @shared_size
end

Public Instance Methods

active_connection?() click to toggle source

@override

# File lib/active_record/bogacs/shareable_pool.rb, line 45
def active_connection?
  return true if current_thread[shared_connection_key]
  has_active_connection? # super
end
clear_reloadable_connections!() click to toggle source

@override

# File lib/active_record/bogacs/shareable_pool.rb, line 74
def clear_reloadable_connections!
  synchronize { @shared_connections.clear; super }
end
connection() click to toggle source

@override

# File lib/active_record/bogacs/shareable_pool.rb, line 40
def connection
  current_thread[shared_connection_key] || super
end
disconnect!() click to toggle source

@override

# File lib/active_record/bogacs/shareable_pool.rb, line 69
def disconnect!
  synchronize { @shared_connections.clear; super }
end
release_connection(owner_thread = Thread.current) click to toggle source

@override called from ConnectionManagement middle-ware (when finished)

# File lib/active_record/bogacs/shareable_pool.rb, line 51
def release_connection(owner_thread = Thread.current)
  conn_id = connection_cache_key(owner_thread)
  if reserved_conn = @thread_cached_conns.delete(conn_id)
    if shared_count = @shared_connections[reserved_conn]
      cheap_synchronize do # lock due #get_shared_connection ... not needed ?!
        # NOTE: the other option is to not care about shared here at all ...
        if shared_count.get == 0 # releasing a shared connection
          release_shared_connection(reserved_conn, owner_thread)
        #else return false
        end
      end
    else # check back-in non-shared connections
      checkin reserved_conn # (what super does)
    end
  end
end
release_shared_connection(connection, owner_thread = Thread.current) click to toggle source

Custom API :

# File lib/active_record/bogacs/shareable_pool.rb, line 100
def release_shared_connection(connection, owner_thread = Thread.current)
  shared_conn_key = shared_connection_key
  if connection == owner_thread[shared_conn_key]
    owner_thread[shared_conn_key] = nil
  end

  @shared_connections.delete(connection)
  shared_checkin connection # synchronized
end
remove(conn) click to toggle source

@override @note called from reap thus the pool should work with reaper

# File lib/active_record/bogacs/shareable_pool.rb, line 80
def remove(conn)
  synchronize { @shared_connections.delete(conn); super }
end
with_shared_connection() { |connection| ... } click to toggle source
# File lib/active_record/bogacs/shareable_pool.rb, line 110
def with_shared_connection
  shared_conn_key = shared_connection_key
  # with_shared_connection call nested in the same thread
  if connection = Thread.current[shared_conn_key]
    emulated_checkout(connection)
    return yield connection
  end

  start = Time.now if DEBUG
  begin
    # if there's a 'regular' connection on the thread use it as super
    if has_active_connection? # for current thread
      connection = self.connection # do not mark as shared
      DEBUG && debug("with_shared_conn 10 got active = #{connection.to_s}")
    # otherwise if we have a shared connection - use that one :
    elsif connection = get_shared_connection
      emulated_checkout(connection); shared = true
      DEBUG && debug("with_shared_conn 20 got shared = #{connection.to_s}")
    else
      shared = true
      synchronize do
        # check shared again as/if threads end up sync-ing up here :
        if connection = get_shared_connection
          emulated_checkout(connection)
          DEBUG && debug("with_shared_conn 21 got shared = #{connection.to_s}")
        end # here we acquire but a connection from the pool
        # TODO the bottle-neck for concurrency doing sync { checkout } :
        unless connection # here we acquire a connection from the pool
          connection = self.checkout # might block if pool fully used
          add_shared_connection(connection)
          DEBUG && debug("with_shared_conn 30 acq shared = #{connection.to_s}")
        end
      end
    end

    Thread.current[shared_conn_key] = connection if shared

    DEBUG && debug("with_shared_conn obtaining a connection took #{(Time.now - start) * 1000}ms")
    yield connection
  ensure
    Thread.current[shared_conn_key] = nil if shared
    rem_shared_connection(connection) if shared && connection
  end
end

Private Instance Methods

acquire_connection_no_wait?() click to toggle source
# File lib/active_record/bogacs/shareable_pool.rb, line 161
def acquire_connection_no_wait?
  synchronize do
    @connections.size < @size || @available.send(:can_remove_no_wait?)
    #return true if @connections.size < @size
    # @connections.size < @size || Queue#can_remove_no_wait? :
    #queue = @available.instance_variable_get(:@queue)
    #num_waiting = @available.instance_variable_get(:@num_waiting)
    #queue.size > num_waiting
  end
end
add_shared_connection(connection) click to toggle source
# File lib/active_record/bogacs/shareable_pool.rb, line 219
def add_shared_connection(connection)
  @shared_connections[connection] = AtomicReference.new(1)
end
debug(msg) click to toggle source
# File lib/active_record/bogacs/shareable_pool.rb, line 274
def debug(msg); DEBUG.debug msg end
emulated_checkin(connection) click to toggle source
# File lib/active_record/bogacs/shareable_pool.rb, line 233
def emulated_checkin(connection)
  # NOTE: not sure we'd like to run `run_callbacks :checkin {}` here ...
  connection.expire if connection.owner.equal? Thread.current
end
emulated_checkout(connection) click to toggle source
# File lib/active_record/bogacs/shareable_pool.rb, line 238
def emulated_checkout(connection)
  # NOTE: not sure we'd like to run `run_callbacks :checkout {}` here ...
  connection.lease unless connection.in_use? # connection.verify! auto-reconnect should do this
end
get_shared_connection() click to toggle source

get a (shared) connection that is least shared among threads (or nil) nil gets returned if it’s ‘better’ to checkout a new one to be shared … to better utilize shared connection reuse among multiple threads

# File lib/active_record/bogacs/shareable_pool.rb, line 181
def get_shared_connection # (lock = nil)
  least_count = MAX_THREAD_SHARING + 1; least_shared = nil
  shared_connections_size = @shared_connections.size

  @shared_connections.each_pair do |connection, shared_count|
    next if shared_count.get >= MAX_THREAD_SHARING
    if ( shared_count = shared_count.get ) < least_count
      DEBUG && debug(" get_shared_conn loop : #{connection.to_s} shared #{shared_count}-time(s)")
      # ! DO NOT return connection if shared_count == 0
      least_count = shared_count; least_shared = connection
    end
  end

  if least_count > 0
    if shared_connections_size < @shared_connections.size
      DEBUG && debug(" get_shared_conn retry (shared connection added)")
      return get_shared_connection # someone else added something re-try
    end
    if ( @shared_connections.size < @shared_size ) && acquire_connection_no_wait?
      DEBUG && debug(" get_shared_conn return none - acquire from pool")
      return nil # we should rather 'get' a new shared one from the pool
    end
  end

  # we did as much as could without a lock - now sync due possible release
  cheap_synchronize do # TODO although this likely might be avoided ...
    # should try again if possibly the same connection got released :
    unless least_count = @shared_connections[least_shared]
      DEBUG && debug(" get_shared_conn retry (connection got released)")
      return get_shared_connection
    end
    least_count.update { |v| v + 1 }
  end if least_shared

  DEBUG && debug(" get_shared_conn least shared = #{least_shared.to_s}")
  least_shared # might be nil in that case we'll likely wait (as super)
end
has_active_connection?() click to toggle source
# File lib/active_record/bogacs/shareable_pool.rb, line 157
def has_active_connection? # super.active_connection?
  @thread_cached_conns.fetch(connection_cache_key(current_thread), nil)
end
rem_shared_connection(connection) click to toggle source
# File lib/active_record/bogacs/shareable_pool.rb, line 223
def rem_shared_connection(connection)
  if shared_count = @shared_connections[connection]
     # shared_count.update { |v| v - 1 } # NOTE: likely fine without lock!
     cheap_synchronize do # give it back to the pool
       shared_count.update { |v| v - 1 } # might give it back if :
       release_shared_connection(connection) if shared_count.get == 0
     end
  end
end
shared_checkin(conn) click to toggle source
# File lib/active_record/bogacs/shareable_pool.rb, line 243
def shared_checkin(conn)
  #conn.lock.synchronize do
  synchronize do
    _run_checkin_callbacks(conn) if conn.owner.equal? Thread.current

    @available.add conn
  end
  #end
end
shared_connection_key() click to toggle source
# File lib/active_record/bogacs/shareable_pool.rb, line 253
def shared_connection_key
  @shared_connection_key ||= :"shared_pool_connection##{object_id}"
end