# File lib/active_record/connection_adapters/redshift_adapter.rb, line 980 def primary_key(table) row = exec_query(<<-end_sql, 'SCHEMA').rows.first SELECT DISTINCT(attr.attname) FROM pg_attribute attr INNER JOIN pg_depend dep ON attr.attrelid = dep.refobjid AND attr.attnum = dep.refobjsubid INNER JOIN pg_constraint cons ON attr.attrelid = cons.conrelid AND attr.attnum = cons.conkey[1] WHERE cons.contype = 'p' AND dep.refobjid = '#{quote_table_name(table)}'::regclass end_sql row && row.first end
class ActiveRecord::ConnectionAdapters::RedshiftAdapter
The Redshift adapter works both with the native C (ruby.scripting.ca/postgres/) and the pure Ruby (available both as gem and from rubyforge.org/frs/?group_id=234&release_id=1944) drivers.
Options:
-
:host
- Defaults to “localhost”. -
:port
- Defaults to 5432. -
:username
- Defaults to nothing. -
:password
- Defaults to nothing. -
:database
- The name of the database. No default, must be provided. -
:schema_search_path
- An optional schema search path for the connection given as a string of comma-separated schema names. This is backward-compatible with the:schema_order
option. -
:encoding
- An optional client encoding that is used in aSET client_encoding TO <encoding>
call on the connection.
Constants
- ADAPTER_NAME
- NATIVE_DATABASE_TYPES
Public Class Methods
Initializes and connects a Redshift adapter.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 345 def initialize(connection, logger, connection_parameters, config) super(connection, logger) if config.fetch(:prepared_statements) { true } @visitor = Arel::Visitors::PostgreSQL.new self else @visitor = BindSubstitution.new self end connection_parameters.delete :prepared_statements @connection_parameters, @config = connection_parameters, config # @local_tz is initialized as nil to avoid warnings when connect tries to use it @local_tz = nil @table_alias_length = nil connect @statements = StatementPool.new @connection, config.fetch(:statement_limit) { 1000 } if redshift_version < 80002 raise "Your version of Redshift (#{redshift_version}) is too old, please upgrade!" end @local_tz = execute('SHOW TIME ZONE', 'SCHEMA').first["TimeZone"] end
Public Instance Methods
Is this connection alive and ready for queries?
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 379 def active? @connection.query 'SELECT 1' true rescue PGError false end
Returns ‘Redshift’ as adapter name for identification purposes.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 274 def adapter_name ADAPTER_NAME end
Adds a new column to the named table. See TableDefinition#column for details of the options you can use.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1011 def add_column(table_name, column_name, type, options = {}) clear_cache! add_column_sql = "ALTER TABLE #{quote_table_name(table_name)} ADD COLUMN #{quote_column_name(column_name)} #{type_to_sql(type, options[:limit], options[:precision], options[:scale])}" add_column_options!(add_column_sql, options) execute add_column_sql end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1050 def add_index(*) # XXX nothing to do end
Begins a transaction.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 731 def begin_db_transaction execute "BEGIN" end
Changes the column of a table.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1020 def change_column(table_name, column_name, type, options = {}) clear_cache! quoted_table_name = quote_table_name(table_name) execute "ALTER TABLE #{quoted_table_name} ALTER COLUMN #{quote_column_name(column_name)} TYPE #{type_to_sql(type, options[:limit], options[:precision], options[:scale])}" change_column_default(table_name, column_name, options[:default]) if options_include_default?(options) change_column_null(table_name, column_name, options[:null], options[:default]) if options.key?(:null) end
Changes the default value of a table column.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1031 def change_column_default(table_name, column_name, default) clear_cache! execute "ALTER TABLE #{quote_table_name(table_name)} ALTER COLUMN #{quote_column_name(column_name)} SET DEFAULT #{quote(default)}" end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1036 def change_column_null(table_name, column_name, null, default = nil) clear_cache! unless null || default.nil? execute("UPDATE #{quote_table_name(table_name)} SET #{quote_column_name(column_name)}=#{quote(default)} WHERE #{quote_column_name(column_name)} IS NULL") end execute("ALTER TABLE #{quote_table_name(table_name)} ALTER #{quote_column_name(column_name)} #{null ? 'DROP' : 'SET'} NOT NULL") end
Clears the prepared statements cache.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 374 def clear_cache! @statements.clear end
Returns the list of all column definitions for a table.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 856 def columns(table_name, name = nil) # Limit, precision, and scale are all handled by the superclass. column_definitions(table_name).collect do |column_name, type, default, notnull| RedshiftColumn.new(column_name, default, type, notnull == 'f', @config[:read_timezone]) end end
Commits a transaction.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 736 def commit_db_transaction execute "COMMIT" end
Create a new Redshift database. Options include :owner
, :template
, :encoding
, :tablespace
, and :connection_limit
(note that MySQL uses :charset
while Redshift uses :encoding
).
Example:
create_database config[:database], config create_database 'foo_development', :encoding => 'unicode'
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 781 def create_database(name, options = {}) options = options.reverse_merge(:encoding => "utf8") option_string = options.symbolize_keys.sum do |key, value| case key when :owner " OWNER = \"#{value}\"" when :template " TEMPLATE = \"#{value}\"" when :encoding " ENCODING = '#{value}'" when :tablespace " TABLESPACE = \"#{value}\"" when :connection_limit " CONNECTION LIMIT = #{value}" else "" end end execute "CREATE DATABASE #{quote_table_name(name)}#{option_string}" end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 749 def create_savepoint execute("SAVEPOINT #{current_savepoint_name}") end
Returns the current database name.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 864 def current_database query('select current_database()', 'SCHEMA')[0][0] end
Returns the current schema name.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 869 def current_schema query('SELECT current_schema', 'SCHEMA')[0][0] end
Disconnects from the database if already connected. Otherwise, this method does nothing.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 401 def disconnect! clear_cache! @connection.close rescue nil end
Returns the current database encoding format.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 874 def encoding query(<<-end_sql, 'SCHEMA')[0][0] SELECT pg_encoding_to_char(pg_database.encoding) FROM pg_database WHERE pg_database.datname LIKE '#{current_database}' end_sql end
Escapes binary strings for bytea input to the database.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 446 def escape_bytea(value) @connection.escape_bytea(value) if value end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 702 def exec_delete(sql, name = 'SQL', binds = []) log(sql, name, binds) do result = binds.empty? ? exec_no_cache(sql, binds) : exec_cache(sql, binds) affected = result.cmd_tuples result.clear affected end end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 691 def exec_query(sql, name = 'SQL', binds = []) log(sql, name, binds) do result = binds.empty? ? exec_no_cache(sql, binds) : exec_cache(sql, binds) ret = ActiveRecord::Result.new(result.fields, result_as_array(result)) result.clear return ret end end
Executes an SQL statement, returning a PGresult object on success or raising a PGError exception otherwise.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 681 def execute(sql, name = nil) log(sql, name) do @connection.async_exec(sql) end end
DATABASE STATEMENTS ======================================
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 562 def explain(arel, binds = []) sql = "EXPLAIN #{to_sql(arel, binds)}" ExplainPrettyPrinter.new.pp(exec_query(sql, 'EXPLAIN', binds)) end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1060 def index_name_length 63 end
Returns an array of indexes for the given table.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 851 def indexes(table_name, name = nil) [] end
Executes an INSERT query and returns the new record’s ID
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 611 def insert_sql(sql, name = nil, pk = nil, id_value = nil, sequence_name = nil) unless pk # Extract the table from the insert sql. Yuck. table_ref = extract_table_ref_from_insert_sql(sql) pk = primary_key(table_ref) if table_ref end if pk && use_insert_returning? select_value("#{sql} RETURNING #{quote_column_name(pk)}") elsif pk super last_insert_id_value(sequence_name || default_sequence_name(table_ref, pk)) else super end end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 745 def outside_transaction? @connection.transaction_status == PGconn::PQTRANS_IDLE end
Returns just a table’s primary key
Checks the following cases:
-
table_name
-
“table.name”
-
schema_name.table_name
-
schema_name.“table.name”
-
“schema.name”.table_name
-
“schema.name”.“table.name”
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 511 def quote_table_name(name) schema, name_part = extract_pg_identifier_from_name(name.to_s) unless name_part quote_column_name(schema) else table_name, name_part = extract_pg_identifier_from_name(name_part) "#{quote_column_name(schema)}.#{quote_column_name(table_name)}" end end
Close then reopen the connection.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 387 def reconnect! clear_cache! @connection.reset @open_transactions = 0 configure_connection end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 757 def release_savepoint execute("RELEASE SAVEPOINT #{current_savepoint_name}") end
Renames a column in a table.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1045 def rename_column(table_name, column_name, new_column_name) clear_cache! execute "ALTER TABLE #{quote_table_name(table_name)} RENAME COLUMN #{quote_column_name(column_name)} TO #{quote_column_name(new_column_name)}" end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1057 def rename_index(table_name, old_name, new_name) end
Renames a table. Also renames a table’s primary key sequence if the sequence name matches the Active Record default.
Example:
rename_table('octopuses', 'octopi')
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 999 def rename_table(name, new_name) clear_cache! execute "ALTER TABLE #{quote_table_name(name)} RENAME TO #{quote_table_name(new_name)}" pk, seq = pk_and_sequence_for(new_name) if seq == "#{name}_#{pk}_seq" new_seq = "#{new_name}_#{pk}_seq" execute "ALTER TABLE #{quote_table_name(seq)} RENAME TO #{quote_table_name(new_seq)}" end end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 394 def reset! clear_cache! super end
Aborts a transaction.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 741 def rollback_db_transaction execute "ROLLBACK" end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 753 def rollback_to_savepoint execute("ROLLBACK TO SAVEPOINT #{current_savepoint_name}") end
Returns true if schema exists.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 842 def schema_exists?(name) exec_query(<<-SQL, 'SCHEMA').rows.first[0].to_i > 0 SELECT COUNT(*) FROM pg_namespace WHERE nspname = '#{name}' SQL end
Returns the active schema search path.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 894 def schema_search_path @schema_search_path ||= query('SHOW search_path', 'SCHEMA')[0][0] end
Sets the schema search path to a string of comma-separated schema names. Names beginning with $ have to be quoted (e.g. $user => ‘$user’). See: www.redshift.org/docs/current/static/ddl-schemas.html
This should be not be called manually but set in database.yml.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 886 def schema_search_path=(schema_csv) if schema_csv execute("SET search_path TO #{schema_csv}", 'SCHEMA') @schema_search_path = schema_csv end end
Executes a SELECT query and returns an array of rows. Each row is an array of field values.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 606 def select_rows(sql, name = nil) select_raw(sql, name).last end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 905 def serial_sequence(table, column) result = exec_query(<<-eosql, 'SCHEMA') SELECT pg_get_serial_sequence('#{table}', '#{column}') eosql result.rows.first.first end
Set the authorized user for this session
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 538 def session_auth=(user) clear_cache! exec_query "SET SESSION AUTHORIZATION #{user}" end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 713 def sql_for_insert(sql, pk, id_value, sequence_name, binds) unless pk # Extract the table from the insert sql. Yuck. table_ref = extract_table_ref_from_insert_sql(sql) pk = primary_key(table_ref) if table_ref end sql = "#{sql} RETURNING #{quote_column_name(pk)}" if pk && use_insert_returning? [sql, binds] end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 687 def substitute_at(column, index) Arel::Nodes::BindParam.new "$#{index + 1}" end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 424 def supports_ddl_transactions? true end
Returns true.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 434 def supports_explain? true end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 284 def supports_index_sort_order? true end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 420 def supports_insert_with_returning? false end
Returns true, since this connection adapter supports migrations.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 411 def supports_migrations? true end
Returns true, since this connection adapter supports savepoints.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 429 def supports_savepoints? true end
Returns true
, since this connection adapter supports prepared statement caching.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 280 def supports_statement_cache? true end
Returns the configured supported identifier length supported by Redshift
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 439 def table_alias_length @table_alias_length ||= query('SHOW max_identifier_length')[0][0].to_i end
Returns true if table exists. If the schema is not specified as part of name
then it will only find tables within the current schema search path (regardless of permissions to access tables in other schemas)
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 824 def table_exists?(name) schema, table = Utils.extract_schema_and_table(name.to_s) return false unless table binds = [[nil, table]] binds << [nil, schema] if schema exec_query(<<-SQL, 'SCHEMA').rows.first[0].to_i > 0 SELECT COUNT(*) FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind in ('v','r') AND c.relname = '#{table.gsub(/(^"|"$)/,'')}' AND n.nspname = #{schema ? "'#{schema}'" : 'ANY (current_schemas(false))'} SQL end
Returns the list of all tables in the schema search path or a specified schema.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 813 def tables(name = nil) query(<<-SQL, 'SCHEMA').map { |row| "#{row[0]}.#{row[1]}" } SELECT schemaname, tablename FROM pg_tables WHERE schemaname = ANY (current_schemas(false)) SQL end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 486 def type_cast(value, column) return super unless column case value when String return super unless 'bytea' == column.sql_type { :value => value, :format => 1 } else super end end
Maps logical Rails types to Redshift-specific data types.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1065 def type_to_sql(type, limit = nil, precision = nil, scale = nil) case type.to_s when 'binary' # Redshift doesn't support limits on binary (bytea) columns. # The hard limit is 1Gb, because of a 32-bit size field, and TOAST. case limit when nil, 0..0x3fffffff; super(type) else raise(ActiveRecordError, "No binary type has byte size #{limit}.") end when 'integer' return 'integer' unless limit case limit when 1, 2; 'smallint' when 3, 4; 'integer' when 5..8; 'bigint' else raise(ActiveRecordError, "No integer type has byte size #{limit}. Use a numeric with precision 0 instead.") end else super end end
Unescapes bytea output from a database to the binary string it represents. NOTE: This is NOT an inverse of escape_bytea
! This is only to be used
on escaped binary output from database drive.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 453 def unescape_bytea(value) @connection.unescape_bytea(value) if value end
Executes an UPDATE query and returns the number of affected tuples.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 726 def update_sql(sql, name = nil) super.cmd_tuples end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 761 def use_insert_returning? @use_insert_returning end
Protected Instance Methods
Returns the version of the connected Redshift server.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1127 def redshift_version @connection.server_version end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1131 def translate_exception(exception, message) case exception.message when /duplicate key value violates unique constraint/ RecordNotUnique.new(message, exception) when /violates foreign key constraint/ InvalidForeignKey.new(message, exception) else super end end
Private Instance Methods
Configures the encoding, verbosity, schema search path, and time zone of the connection. This is called by connect
and should not be called manually.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1213 def configure_connection if @config[:encoding] @connection.set_client_encoding(@config[:encoding]) end self.schema_search_path = @config[:schema_search_path] || @config[:schema_order] end
Connects to a Redshift server and sets up the adapter depending on the connected server’s characteristics.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1200 def connect @connection = PGconn.connect(*@connection_parameters) # Money type has a fixed precision of 10 in Redshift 8.2 and below, and as of # Redshift 8.3 it has a fixed precision of 19. RedshiftColumn.extract_precision # should know about this but can't detect it there, so deal with it here. RedshiftColumn.money_precision = (redshift_version >= 80300) ? 19 : 10 configure_connection end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1149 def exec_cache(sql, binds) begin stmt_key = prepare_statement sql # Clear the queue @connection.get_last_result @connection.send_query_prepared(stmt_key, binds.map { |col, val| type_cast(val, col) }) @connection.block @connection.get_last_result rescue PGError => e # Get the PG code for the failure. Annoyingly, the code for # prepared statements whose return value may have changed is # FEATURE_NOT_SUPPORTED. Check here for more details: # http://git.redshift.org/gitweb/?p=redshift.git;a=blob;f=src/backend/utils/cache/plancache.c#l573 code = e.result.result_error_field(PGresult::PG_DIAG_SQLSTATE) if FEATURE_NOT_SUPPORTED == code @statements.delete sql_key(sql) retry else raise e end end end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1145 def exec_no_cache(sql, binds) @connection.async_exec(sql) end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1270 def extract_pg_identifier_from_name(name) match_data = name.start_with?('"') ? name.match(/\"([^\"]+)\"/) : name.match(/([^\.]+)/) if match_data rest = name[match_data[0].length, name.length] rest = rest[1, rest.length] if rest.start_with? "." [match_data[1], (rest.length > 0 ? rest : nil)] end end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1280 def extract_table_ref_from_insert_sql(sql) sql[/into\s+([^\(]*).*values\s*\(/i] $1.strip if $1 end
Prepare the statement if it hasn’t been prepared, return the statement key.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1183 def prepare_statement(sql) sql_key = sql_key(sql) unless @statements.key? sql_key nextkey = @statements.next_key @connection.prepare nextkey, sql @statements[sql_key] = nextkey end @statements[sql_key] end
Executes a SELECT query and returns the results, performing any data type conversions that are required to be performed here instead of in RedshiftColumn
.
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1228 def select(sql, name = nil, binds = []) exec_query(sql, name, binds).to_a end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1232 def select_raw(sql, name = nil) res = execute(sql, name) results = result_as_array(res) fields = res.fields res.clear return fields, results end
Returns the statement identifier for the client side cache of statements
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1177 def sql_key(sql) "#{schema_search_path}-#{sql}" end
# File lib/active_record/connection_adapters/redshift_adapter.rb, line 1285 def table_definition TableDefinition.new(self) end