class Octopus::Proxy
Constants
- BLOCK_KEY
- CURRENT_GROUP_KEY
- CURRENT_LOAD_BALANCE_OPTIONS_KEY
- CURRENT_MODEL_KEY
- CURRENT_SHARD_KEY
- CURRENT_SLAVE_GROUP_KEY
- FULLY_REPLICATED_KEY
- LAST_CURRENT_SHARD_KEY
Attributes
Public Class Methods
# File lib/octopus/proxy.rb, line 18 def initialize(config = Octopus.config) initialize_shards(config) initialize_replication(config) if !config.nil? && config['replicated'] end
Public Instance Methods
# File lib/octopus/proxy.rb, line 176 def block Thread.current[BLOCK_KEY] end
# File lib/octopus/proxy.rb, line 180 def block=(block) Thread.current[BLOCK_KEY] = block end
# File lib/octopus/proxy.rb, line 274 def check_schema_migrations(shard) OctopusModel.using(shard).connection.table_exists?( ActiveRecord::Migrator.schema_migrations_table_name, ) || OctopusModel.using(shard).connection.initialize_schema_migrations_table end
# File lib/octopus/proxy.rb, line 267 def clean_connection_proxy self.current_shard = Octopus.master_shard self.current_model = nil self.current_group = nil self.block = nil end
# File lib/octopus/proxy.rb, line 328 def clear_active_connections! with_each_healthy_shard(&:release_connection) end
# File lib/octopus/proxy.rb, line 332 def clear_all_connections! with_each_healthy_shard(&:disconnect!) end
# File lib/octopus/proxy.rb, line 324 def clear_query_cache with_each_healthy_shard { |v| v.connected? && safe_connection(v).clear_query_cache } end
# File lib/octopus/proxy.rb, line 336 def connected? @shards.any? { |_k, v| v.connected? } end
# File lib/octopus/proxy.rb, line 311 def connection_pool @shards[current_shard] end
# File lib/octopus/proxy.rb, line 146 def current_group Thread.current[CURRENT_GROUP_KEY] end
# File lib/octopus/proxy.rb, line 150 def current_group=(group_symbol) # TODO: Error message should include all groups if given more than one bad name. [group_symbol].flatten.compact.each do |group| fail "Nonexistent Group Name: #{group}" unless has_group?(group) end Thread.current[CURRENT_GROUP_KEY] = group_symbol end
# File lib/octopus/proxy.rb, line 168 def current_load_balance_options Thread.current[CURRENT_LOAD_BALANCE_OPTIONS_KEY] end
# File lib/octopus/proxy.rb, line 172 def current_load_balance_options=(options) Thread.current[CURRENT_LOAD_BALANCE_OPTIONS_KEY] = options end
# File lib/octopus/proxy.rb, line 101 def current_model Thread.current[CURRENT_MODEL_KEY] end
# File lib/octopus/proxy.rb, line 105 def current_model=(model) Thread.current[CURRENT_MODEL_KEY] = model.is_a?(ActiveRecord::Base) ? model.class : model end
# File lib/octopus/proxy.rb, line 109 def current_shard Thread.current[CURRENT_SHARD_KEY] ||= Octopus.master_shard end
# File lib/octopus/proxy.rb, line 113 def current_shard=(shard_symbol) if shard_symbol.is_a?(Array) self.current_slave_group = nil shard_symbol.each { |symbol| fail "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? } elsif shard_symbol.is_a?(Hash) hash = shard_symbol shard_symbol = hash[:shard] slave_group_symbol = hash[:slave_group] load_balance_options = hash[:load_balance_options] if shard_symbol.nil? && slave_group_symbol.nil? fail 'Neither shard or slave group must be specified' end if shard_symbol.present? fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end if slave_group_symbol.present? if (@shards_slave_groups.try(:[], shard_symbol).present? && @shards_slave_groups[shard_symbol][slave_group_symbol].nil?) || (@shards_slave_groups.try(:[], shard_symbol).nil? && @slave_groups[slave_group_symbol].nil?) fail "Nonexistent Slave Group Name: #{slave_group_symbol} in shards config: #{@shards_config.inspect}" end end self.current_slave_group = slave_group_symbol self.current_load_balance_options = load_balance_options else fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end Thread.current[CURRENT_SHARD_KEY] = shard_symbol end
# File lib/octopus/proxy.rb, line 159 def current_slave_group Thread.current[CURRENT_SLAVE_GROUP_KEY] end
# File lib/octopus/proxy.rb, line 163 def current_slave_group=(slave_group_symbol) Thread.current[CURRENT_SLAVE_GROUP_KEY] = slave_group_symbol Thread.current[CURRENT_LOAD_BALANCE_OPTIONS_KEY] = nil if slave_group_symbol.nil? end
# File lib/octopus/proxy.rb, line 320 def disable_query_cache! with_each_healthy_shard { |v| v.connected? && safe_connection(v).disable_query_cache! } end
# File lib/octopus/proxy.rb, line 315 def enable_query_cache! clear_query_cache with_each_healthy_shard { |v| v.connected? && safe_connection(v).enable_query_cache! } end
# File lib/octopus/proxy.rb, line 192 def fully_replicated? @fully_replicated || Thread.current[FULLY_REPLICATED_KEY] end
Public: Whether or not a group exists with the given name converted to a string.
Returns a boolean.
# File lib/octopus/proxy.rb, line 200 def has_group?(group) @groups.key?(group.to_s) end
# File lib/octopus/proxy.rb, line 88 def initialize_replication(config) @replicated = true if config.key?('fully_replicated') @fully_replicated = config['fully_replicated'] else @fully_replicated = true end @slaves_list = @shards.keys.map(&:to_s).sort @slaves_list.delete('master') @slaves_load_balancer = Octopus.load_balancer.new(@slaves_list) end
# File lib/octopus/proxy.rb, line 23 def initialize_shards(config) @shards = HashWithIndifferentAccess.new @shards_slave_groups = HashWithIndifferentAccess.new @slave_groups = HashWithIndifferentAccess.new @groups = {} @adapters = Set.new @config = ActiveRecord::Base.connection_pool_without_octopus.spec.config unless config.nil? @entire_sharded = config['entire_sharded'] @shards_config = config[Octopus.rails_env] end @shards_config ||= [] @shards_config.each do |key, value| if value.is_a?(String) value = resolve_string_connection(value).merge(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") elsif value.is_a?(Hash) && value.key?('adapter') value.merge!(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") slave_group_configs = value.select do |_k, v| structurally_slave_group? v end if slave_group_configs.present? slave_groups = HashWithIndifferentAccess.new slave_group_configs.each do |slave_group_name, slave_configs| slaves = HashWithIndifferentAccess.new slave_configs.each do |slave_name, slave_config| @shards[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection") slaves[slave_name.to_sym] = slave_name.to_sym end slave_groups[slave_group_name.to_sym] = Octopus::SlaveGroup.new(slaves) end @shards_slave_groups[key.to_sym] = slave_groups @sharded = true end elsif value.is_a?(Hash) @groups[key.to_s] = [] value.each do |k, v| fail 'You have duplicated shard names!' if @shards.key?(k.to_sym) initialize_adapter(v['adapter']) config_with_octopus_shard = v.merge(:octopus_shard => k) @shards[k.to_sym] = connection_pool_for(config_with_octopus_shard, "#{v['adapter']}_connection") @groups[key.to_s] << k.to_sym end if structurally_slave_group? value slaves = Hash[@groups[key.to_s].map { |v| [v, v] }] @slave_groups[key.to_sym] = Octopus::SlaveGroup.new(slaves) end end end @shards[:master] ||= ActiveRecord::Base.connection_pool_without_octopus if Octopus.master_shard == :master end
# File lib/octopus/proxy.rb, line 184 def last_current_shard Thread.current[LAST_CURRENT_SHARD_KEY] end
# File lib/octopus/proxy.rb, line 188 def last_current_shard=(last_current_shard) Thread.current[LAST_CURRENT_SHARD_KEY] = last_current_shard end
# File lib/octopus/proxy.rb, line 290 def method_missing(method, *args, &block) if should_clean_connection_proxy?(method) conn = select_connection self.last_current_shard = current_shard clean_connection_proxy conn.send(method, *args, &block) elsif should_send_queries_to_shard_slave_group?(method) send_queries_to_shard_slave_group(method, *args, &block) elsif should_send_queries_to_slave_group?(method) send_queries_to_slave_group(method, *args, &block) elsif should_send_queries_to_replicated_databases?(method) send_queries_to_selected_slave(method, *args, &block) else select_connection.send(method, *args, &block) end end
# File lib/octopus/proxy.rb, line 307 def respond_to?(method, include_private = false) super || select_connection.respond_to?(method, include_private) end
# File lib/octopus/proxy.rb, line 243 def run_queries_on_shard(shard, &_block) keeping_connection_proxy(shard) do using_shard(shard) do yield end end end
Rails 3.1 sets automatic_reconnect to false when it removes connection pool. Octopus
can potentially retain a reference to a closed connection pool. Previously, that would work since the pool would just reconnect, but in Rails 3.1 the flag prevents this.
# File lib/octopus/proxy.rb, line 223 def safe_connection(connection_pool) connection_pool.automatic_reconnect ||= true if !connection_pool.connected? && @shards[Octopus.master_shard].connection.query_cache_enabled connection_pool.connection.enable_query_cache! end connection_pool.connection end
# File lib/octopus/proxy.rb, line 231 def select_connection safe_connection(@shards[shard_name]) end
# File lib/octopus/proxy.rb, line 263 def send_queries_to_all_shards(&block) send_queries_to_multiple_shards(shard_names.uniq { |shard_name| @shards[shard_name] }, &block) end
# File lib/octopus/proxy.rb, line 257 def send_queries_to_group(group, &block) using_group(group) do send_queries_to_multiple_shards(shards_for_group(group), &block) end end
# File lib/octopus/proxy.rb, line 251 def send_queries_to_multiple_shards(shards, &block) shards.map do |shard| run_queries_on_shard(shard, &block) end end
# File lib/octopus/proxy.rb, line 344 def send_queries_to_shard_slave_group(method, *args, &block) send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block) end
# File lib/octopus/proxy.rb, line 352 def send_queries_to_slave_group(method, *args, &block) send_queries_to_balancer(@slave_groups[current_slave_group], method, *args, &block) end
# File lib/octopus/proxy.rb, line 235 def shard_name current_shard.is_a?(Array) ? current_shard.first : current_shard end
Public: Retrieves names of all loaded shards.
Returns an array of shard names as symbols
# File lib/octopus/proxy.rb, line 207 def shard_names @shards.keys end
Public: Retrieves the defined shards for a given group.
Returns an array of shard names as symbols or nil if the group is not defined.
# File lib/octopus/proxy.rb, line 215 def shards_for_group(group) @groups.fetch(group.to_s, nil) end
# File lib/octopus/proxy.rb, line 239 def should_clean_table_name? @adapters.size > 1 end
# File lib/octopus/proxy.rb, line 340 def should_send_queries_to_shard_slave_group?(method) should_use_slaves_for_method?(method) && @shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present? end
# File lib/octopus/proxy.rb, line 348 def should_send_queries_to_slave_group?(method) should_use_slaves_for_method?(method) && @slave_groups.try(:[], current_slave_group).present? end
# File lib/octopus/proxy.rb, line 280 def transaction(options = {}, &block) if !sharded && current_model_replicated? run_queries_on_shard(Octopus.master_shard) do select_connection.transaction(options, &block) end else select_connection.transaction(options, &block) end end
Protected Instance Methods
# File lib/octopus/proxy.rb, line 396 def connection_pool_for(adapter, config) if Octopus.rails4? arg = ActiveRecord::ConnectionAdapters::ConnectionSpecification.new(adapter.dup, config) else arg = ActiveRecord::Base::ConnectionSpecification.new(adapter.dup, config) end ActiveRecord::ConnectionAdapters::ConnectionPool.new(arg) end
# File lib/octopus/proxy.rb, line 438 def current_model_replicated? @replicated && (current_model.try(:replicated) || fully_replicated?) end
# File lib/octopus/proxy.rb, line 406 def initialize_adapter(adapter) @adapters << adapter begin require "active_record/connection_adapters/#{adapter}_adapter" rescue LoadError raise "Please install the #{adapter} adapter: `gem install activerecord-#{adapter}-adapter` (#{$ERROR_INFO})" end end
Temporarily block cleaning connection proxy and run the block
@see Octopus::Proxy#should_clean_connection? @see Octopus::Proxy#clean_connection_proxy
# File lib/octopus/proxy.rb, line 487 def keeping_connection_proxy(shard, &_block) last_block = block begin self.block = shard yield ensure self.block = last_block || nil end end
# File lib/octopus/proxy.rb, line 415 def resolve_string_connection(spec) if Octopus.rails41? resolver = ActiveRecord::ConnectionAdapters::ConnectionSpecification::Resolver.new({}) HashWithIndifferentAccess.new(resolver.spec(spec).config) else if Octopus.rails4? resolver = ActiveRecord::ConnectionAdapters::ConnectionSpecification::Resolver.new(spec, {}) else resolver = ActiveRecord::Base::ConnectionSpecification::Resolver.new(spec, {}) end HashWithIndifferentAccess.new(resolver.spec.config) end end
Temporarily switch ‘current_shard` to the next slave in a slave group and send queries to it while preserving `current_shard`
# File lib/octopus/proxy.rb, line 471 def send_queries_to_balancer(balancer, method, *args, &block) send_queries_to_slave(balancer.next(current_load_balance_options), method, *args, &block) end
# File lib/octopus/proxy.rb, line 442 def send_queries_to_selected_slave(method, *args, &block) if current_model.replicated || fully_replicated? selected_slave = @slaves_load_balancer.next current_load_balance_options else selected_slave = Octopus.master_shard end send_queries_to_slave(selected_slave, method, *args, &block) end
Temporarily switch ‘current_shard` to the specified slave and send queries to it while preserving `current_shard`
# File lib/octopus/proxy.rb, line 477 def send_queries_to_slave(slave, method, *args, &block) using_shard(slave) do select_connection.send(method, *args, &block) end end
# File lib/octopus/proxy.rb, line 429 def should_clean_connection_proxy?(method) method.to_s =~ /insert|select|execute/ && !current_model_replicated? && (!block || block != current_shard) end
Try to use slaves if and only if ‘replicated: true` is specified in `shards.yml` and no slaves groups are defined
# File lib/octopus/proxy.rb, line 434 def should_send_queries_to_replicated_databases?(method) @replicated && method.to_s =~ /select/ && !block && !slaves_grouped? end
We should use slaves if and only if its safe to do so.
We can safely use slaves when: (1) ‘replicated: true` is specified in `shards.yml` (2) The current model is `replicated()`, or `fully_replicated: true` is specified in `shards.yml` which means that
all the model is `replicated()`
(3) It’s a SELECT query while ensuring that we revert ‘current_shard` from the selected slave to the (shard’s) master not to make queries other than SELECT leak to the slave.
# File lib/octopus/proxy.rb, line 461 def should_use_slaves_for_method?(method) current_model_replicated? && method.to_s =~ /select/ end
# File lib/octopus/proxy.rb, line 465 def slaves_grouped? @slave_groups.present? end
# File lib/octopus/proxy.rb, line 529 def structurally_slave?(config) config.is_a?(Hash) && config.key?('adapter') end
# File lib/octopus/proxy.rb, line 533 def structurally_slave_group?(config) config.is_a?(Hash) && config.values.any? { |v| structurally_slave? v } end
Temporarily switch ‘current_group` and run the block
# File lib/octopus/proxy.rb, line 518 def using_group(group, &_block) older_group = current_group begin self.current_group = group yield ensure self.current_group = older_group end end
Temporarily switch ‘current_shard` and run the block
# File lib/octopus/proxy.rb, line 499 def using_shard(shard, &_block) older_shard = current_shard older_slave_group = current_slave_group older_load_balance_options = current_load_balance_options begin unless current_model && !current_model.allowed_shard?(shard) self.current_shard = shard end yield ensure self.current_shard = older_shard self.current_slave_group = older_slave_group self.current_load_balance_options = older_load_balance_options end end
Ensure that a single failing slave doesn’t take down the entire application
# File lib/octopus/proxy.rb, line 359 def with_each_healthy_shard @shards.each do |shard_name, v| begin yield(v) rescue => e if Octopus.robust_environment? Octopus.logger.error "Error on shard #{shard_name}: #{e.message}" else raise end end end conn_handler = ActiveRecord::Base.connection_handler if conn_handler.respond_to?(:connection_pool_list) # Rails 4+ ar_pools = conn_handler.connection_pool_list else # Rails 3.2 ar_pools = conn_handler.connection_pools.values end ar_pools.each do |pool| next if pool == @shards[:master] # Already handled this begin yield(pool) rescue => e if Octopus.robust_environment? Octopus.logger.error "Error on pool (spec: #{pool.spec}): #{e.message}" else raise end end end end