# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 984 def primary_key(table) row = exec_query(<<-end_sql, 'SCHEMA').rows.first SELECT DISTINCT(attr.attname) FROM pg_attribute attr INNER JOIN pg_depend dep ON attr.attrelid = dep.refobjid AND attr.attnum = dep.refobjsubid INNER JOIN pg_constraint cons ON attr.attrelid = cons.conrelid AND attr.attnum = cons.conkey[1] WHERE cons.contype = 'p' AND dep.refobjid = '#{quote_table_name(table)}'::regclass end_sql row && row.first end
class ActiveRecord::ConnectionAdapters::RedshiftbulkAdapter
The Redshift adapter works both with the native C (ruby.scripting.ca/postgres/) and the pure Ruby (available both as gem and from rubyforge.org/frs/?group_id=234&release_id=1944) drivers.
Options:
-
:host
- Defaults to “localhost”. -
:port
- Defaults to 5432. -
:username
- Defaults to nothing. -
:password
- Defaults to nothing. -
:database
- The name of the database. No default, must be provided. -
:schema_search_path
- An optional schema search path for the connection given as a string of comma-separated schema names. This is backward-compatible with the:schema_order
option. -
:encoding
- An optional client encoding that is used in aSET client_encoding TO <encoding>
call on the connection.
Constants
- ADAPTER_NAME
- NATIVE_DATABASE_TYPES
Public Class Methods
Initializes and connects a Redshift adapter.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 349 def initialize(connection, logger, connection_parameters, config) super(connection, logger) if config.fetch(:prepared_statements) { true } @visitor = Arel::Visitors::PostgreSQL.new self else @visitor = BindSubstitution.new self end connection_parameters.delete :prepared_statements @connection_parameters, @config = connection_parameters, config # @local_tz is initialized as nil to avoid warnings when connect tries to use it @local_tz = nil @table_alias_length = nil connect @statements = StatementPool.new @connection, config.fetch(:statement_limit) { 1000 } if redshift_version < 80002 raise "Your version of Redshift (#{redshift_version}) is too old, please upgrade!" end @local_tz = execute('SHOW TIME ZONE', 'SCHEMA').first["TimeZone"] end
Public Instance Methods
Is this connection alive and ready for queries?
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 383 def active? @connection.query 'SELECT 1' true rescue PGError false end
Returns ‘Redshift’ as adapter name for identification purposes.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 274 def adapter_name ADAPTER_NAME end
Adds a new column to the named table. See TableDefinition#column for details of the options you can use.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1015 def add_column(table_name, column_name, type, options = {}) clear_cache! add_column_sql = "ALTER TABLE #{quote_table_name(table_name)} ADD COLUMN #{quote_column_name(column_name)} #{type_to_sql(type, options[:limit], options[:precision], options[:scale])}" add_column_options!(add_column_sql, options) execute add_column_sql end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1054 def add_index(*) # XXX nothing to do end
Begins a transaction.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 735 def begin_db_transaction execute "BEGIN" end
Changes the column of a table.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1024 def change_column(table_name, column_name, type, options = {}) clear_cache! quoted_table_name = quote_table_name(table_name) execute "ALTER TABLE #{quoted_table_name} ALTER COLUMN #{quote_column_name(column_name)} TYPE #{type_to_sql(type, options[:limit], options[:precision], options[:scale])}" change_column_default(table_name, column_name, options[:default]) if options_include_default?(options) change_column_null(table_name, column_name, options[:null], options[:default]) if options.key?(:null) end
Changes the default value of a table column.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1035 def change_column_default(table_name, column_name, default) clear_cache! execute "ALTER TABLE #{quote_table_name(table_name)} ALTER COLUMN #{quote_column_name(column_name)} SET DEFAULT #{quote(default)}" end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1040 def change_column_null(table_name, column_name, null, default = nil) clear_cache! unless null || default.nil? execute("UPDATE #{quote_table_name(table_name)} SET #{quote_column_name(column_name)}=#{quote(default)} WHERE #{quote_column_name(column_name)} IS NULL") end execute("ALTER TABLE #{quote_table_name(table_name)} ALTER #{quote_column_name(column_name)} #{null ? 'DROP' : 'SET'} NOT NULL") end
Clears the prepared statements cache.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 378 def clear_cache! @statements.clear end
Returns the list of all column definitions for a table.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 860 def columns(table_name, name = nil) # Limit, precision, and scale are all handled by the superclass. column_definitions(table_name).collect do |column_name, type, default, notnull| RedshiftColumn.new(column_name, default, type, notnull == 'f', @config[:read_timezone]) end end
Commits a transaction.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 740 def commit_db_transaction execute "COMMIT" end
Create a new Redshift database. Options include :owner
, :template
, :encoding
, :tablespace
, and :connection_limit
(note that MySQL uses :charset
while Redshift uses :encoding
).
Example:
create_database config[:database], config create_database 'foo_development', :encoding => 'unicode'
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 785 def create_database(name, options = {}) options = options.reverse_merge(:encoding => "utf8") option_string = options.symbolize_keys.sum do |key, value| case key when :owner " OWNER = \"#{value}\"" when :template " TEMPLATE = \"#{value}\"" when :encoding " ENCODING = '#{value}'" when :tablespace " TABLESPACE = \"#{value}\"" when :connection_limit " CONNECTION LIMIT = #{value}" else "" end end execute "CREATE DATABASE #{quote_table_name(name)}#{option_string}" end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 753 def create_savepoint execute("SAVEPOINT #{current_savepoint_name}") end
Returns the current database name.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 868 def current_database query('select current_database()', 'SCHEMA')[0][0] end
Returns the current schema name.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 873 def current_schema query('SELECT current_schema', 'SCHEMA')[0][0] end
Disconnects from the database if already connected. Otherwise, this method does nothing.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 405 def disconnect! clear_cache! @connection.close rescue nil end
Returns the current database encoding format.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 878 def encoding query(<<-end_sql, 'SCHEMA')[0][0] SELECT pg_encoding_to_char(pg_database.encoding) FROM pg_database WHERE pg_database.datname LIKE '#{current_database}' end_sql end
Escapes binary strings for bytea input to the database.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 450 def escape_bytea(value) @connection.escape_bytea(value) if value end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 706 def exec_delete(sql, name = 'SQL', binds = []) log(sql, name, binds) do result = binds.empty? ? exec_no_cache(sql, binds) : exec_cache(sql, binds) affected = result.cmd_tuples result.clear affected end end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 695 def exec_query(sql, name = 'SQL', binds = []) log(sql, name, binds) do result = binds.empty? ? exec_no_cache(sql, binds) : exec_cache(sql, binds) ret = ActiveRecord::Result.new(result.fields, result_as_array(result)) result.clear return ret end end
Executes an SQL statement, returning a PGresult object on success or raising a PGError exception otherwise.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 685 def execute(sql, name = nil) log(sql, name) do @connection.async_exec(sql) end end
DATABASE STATEMENTS ======================================
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 566 def explain(arel, binds = []) sql = "EXPLAIN #{to_sql(arel, binds)}" ExplainPrettyPrinter.new.pp(exec_query(sql, 'EXPLAIN', binds)) end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1064 def index_name_length 63 end
Returns an array of indexes for the given table.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 855 def indexes(table_name, name = nil) [] end
Executes an INSERT query and returns the new record’s ID
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 615 def insert_sql(sql, name = nil, pk = nil, id_value = nil, sequence_name = nil) unless pk # Extract the table from the insert sql. Yuck. table_ref = extract_table_ref_from_insert_sql(sql) pk = primary_key(table_ref) if table_ref end if pk && use_insert_returning? select_value("#{sql} RETURNING #{quote_column_name(pk)}") elsif pk super last_insert_id_value(sequence_name || default_sequence_name(table_ref, pk)) else super end end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 749 def outside_transaction? @connection.transaction_status == PGconn::PQTRANS_IDLE end
Returns just a table’s primary key
Checks the following cases:
-
table_name
-
“table.name”
-
schema_name.table_name
-
schema_name.“table.name”
-
“schema.name”.table_name
-
“schema.name”.“table.name”
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 515 def quote_table_name(name) schema, name_part = extract_pg_identifier_from_name(name.to_s) unless name_part quote_column_name(schema) else table_name, name_part = extract_pg_identifier_from_name(name_part) "#{quote_column_name(schema)}.#{quote_column_name(table_name)}" end end
Close then reopen the connection.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 391 def reconnect! clear_cache! @connection.reset @open_transactions = 0 configure_connection end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 761 def release_savepoint execute("RELEASE SAVEPOINT #{current_savepoint_name}") end
Renames a column in a table.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1049 def rename_column(table_name, column_name, new_column_name) clear_cache! execute "ALTER TABLE #{quote_table_name(table_name)} RENAME COLUMN #{quote_column_name(column_name)} TO #{quote_column_name(new_column_name)}" end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1061 def rename_index(table_name, old_name, new_name) end
Renames a table. Also renames a table’s primary key sequence if the sequence name matches the Active Record default.
Example:
rename_table('octopuses', 'octopi')
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1003 def rename_table(name, new_name) clear_cache! execute "ALTER TABLE #{quote_table_name(name)} RENAME TO #{quote_table_name(new_name)}" pk, seq = pk_and_sequence_for(new_name) if seq == "#{name}_#{pk}_seq" new_seq = "#{new_name}_#{pk}_seq" execute "ALTER TABLE #{quote_table_name(seq)} RENAME TO #{quote_table_name(new_seq)}" end end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 398 def reset! clear_cache! super end
Aborts a transaction.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 745 def rollback_db_transaction execute "ROLLBACK" end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 757 def rollback_to_savepoint execute("ROLLBACK TO SAVEPOINT #{current_savepoint_name}") end
Returns true if schema exists.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 846 def schema_exists?(name) exec_query(<<-SQL, 'SCHEMA').rows.first[0].to_i > 0 SELECT COUNT(*) FROM pg_namespace WHERE nspname = '#{name}' SQL end
Returns the active schema search path.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 898 def schema_search_path @schema_search_path ||= query('SHOW search_path', 'SCHEMA')[0][0] end
Sets the schema search path to a string of comma-separated schema names. Names beginning with $ have to be quoted (e.g. $user => ‘$user’). See: www.redshift.org/docs/current/static/ddl-schemas.html
This should be not be called manually but set in database.yml.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 890 def schema_search_path=(schema_csv) if schema_csv execute("SET search_path TO #{schema_csv}", 'SCHEMA') @schema_search_path = schema_csv end end
Executes a SELECT query and returns an array of rows. Each row is an array of field values.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 610 def select_rows(sql, name = nil) select_raw(sql, name).last end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 909 def serial_sequence(table, column) result = exec_query(<<-eosql, 'SCHEMA') SELECT pg_get_serial_sequence('#{table}', '#{column}') eosql result.rows.first.first end
Set the authorized user for this session
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 542 def session_auth=(user) clear_cache! exec_query "SET SESSION AUTHORIZATION #{user}" end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 717 def sql_for_insert(sql, pk, id_value, sequence_name, binds) unless pk # Extract the table from the insert sql. Yuck. table_ref = extract_table_ref_from_insert_sql(sql) pk = primary_key(table_ref) if table_ref end sql = "#{sql} RETURNING #{quote_column_name(pk)}" if pk && use_insert_returning? [sql, binds] end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 691 def substitute_at(column, index) Arel::Nodes::BindParam.new "$#{index + 1}" end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 428 def supports_ddl_transactions? true end
Returns true.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 438 def supports_explain? true end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 288 def supports_import? true end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 284 def supports_index_sort_order? true end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 424 def supports_insert_with_returning? false end
Returns true, since this connection adapter supports migrations.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 415 def supports_migrations? true end
Returns true, since this connection adapter supports savepoints.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 433 def supports_savepoints? true end
Returns true
, since this connection adapter supports prepared statement caching.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 280 def supports_statement_cache? true end
Returns the configured supported identifier length supported by Redshift
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 443 def table_alias_length @table_alias_length ||= query('SHOW max_identifier_length')[0][0].to_i end
Returns true if table exists. If the schema is not specified as part of name
then it will only find tables within the current schema search path (regardless of permissions to access tables in other schemas)
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 828 def table_exists?(name) schema, table = Utils.extract_schema_and_table(name.to_s) return false unless table binds = [[nil, table]] binds << [nil, schema] if schema exec_query(<<-SQL, 'SCHEMA').rows.first[0].to_i > 0 SELECT COUNT(*) FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind in ('v','r') AND c.relname = '#{table.gsub(/(^"|"$)/,'')}' AND n.nspname = #{schema ? "'#{schema}'" : 'ANY (current_schemas(false))'} SQL end
Returns the list of all tables in the schema search path or a specified schema.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 817 def tables(name = nil) query(<<-SQL, 'SCHEMA').map { |row| "#{row[0]}.#{row[1]}" } SELECT schemaname, tablename FROM pg_tables WHERE schemaname = ANY (current_schemas(false)) SQL end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 490 def type_cast(value, column) return super unless column case value when String return super unless 'bytea' == column.sql_type { :value => value, :format => 1 } else super end end
Maps logical Rails types to Redshift-specific data types.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1069 def type_to_sql(type, limit = nil, precision = nil, scale = nil) case type.to_s when 'binary' # Redshift doesn't support limits on binary (bytea) columns. # The hard limit is 1Gb, because of a 32-bit size field, and TOAST. case limit when nil, 0..0x3fffffff; super(type) else raise(ActiveRecordError, "No binary type has byte size #{limit}.") end when 'integer' return 'integer' unless limit case limit when 1, 2; 'smallint' when 3, 4; 'integer' when 5..8; 'bigint' else raise(ActiveRecordError, "No integer type has byte size #{limit}. Use a numeric with precision 0 instead.") end else super end end
Unescapes bytea output from a database to the binary string it represents. NOTE: This is NOT an inverse of escape_bytea
! This is only to be used
on escaped binary output from database drive.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 457 def unescape_bytea(value) @connection.unescape_bytea(value) if value end
Executes an UPDATE query and returns the number of affected tuples.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 730 def update_sql(sql, name = nil) super.cmd_tuples end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 765 def use_insert_returning? @use_insert_returning end
Protected Instance Methods
Returns the version of the connected Redshift server.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1131 def redshift_version @connection.server_version end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1135 def translate_exception(exception, message) case exception.message when /duplicate key value violates unique constraint/ RecordNotUnique.new(message, exception) when /violates foreign key constraint/ InvalidForeignKey.new(message, exception) else super end end
Private Instance Methods
Configures the encoding, verbosity, schema search path, and time zone of the connection. This is called by connect
and should not be called manually.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1217 def configure_connection if @config[:encoding] @connection.set_client_encoding(@config[:encoding]) end self.schema_search_path = @config[:schema_search_path] || @config[:schema_order] end
Connects to a Redshift server and sets up the adapter depending on the connected server’s characteristics.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1204 def connect @connection = PGconn.connect(*@connection_parameters) # Money type has a fixed precision of 10 in Redshift 8.2 and below, and as of # Redshift 8.3 it has a fixed precision of 19. RedshiftColumn.extract_precision # should know about this but can't detect it there, so deal with it here. RedshiftColumn.money_precision = (redshift_version >= 80300) ? 19 : 10 configure_connection end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1153 def exec_cache(sql, binds) begin stmt_key = prepare_statement sql # Clear the queue @connection.get_last_result @connection.send_query_prepared(stmt_key, binds.map { |col, val| type_cast(val, col) }) @connection.block @connection.get_last_result rescue PGError => e # Get the PG code for the failure. Annoyingly, the code for # prepared statements whose return value may have changed is # FEATURE_NOT_SUPPORTED. Check here for more details: # http://git.redshift.org/gitweb/?p=redshift.git;a=blob;f=src/backend/utils/cache/plancache.c#l573 code = e.result.result_error_field(PGresult::PG_DIAG_SQLSTATE) if FEATURE_NOT_SUPPORTED == code @statements.delete sql_key(sql) retry else raise e end end end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1149 def exec_no_cache(sql, binds) @connection.async_exec(sql) end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1274 def extract_pg_identifier_from_name(name) match_data = name.start_with?('"') ? name.match(/\"([^\"]+)\"/) : name.match(/([^\.]+)/) if match_data rest = name[match_data[0].length, name.length] rest = rest[1, rest.length] if rest.start_with? "." [match_data[1], (rest.length > 0 ? rest : nil)] end end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1284 def extract_table_ref_from_insert_sql(sql) sql[/into\s+([^\(]*).*values\s*\(/i] $1.strip if $1 end
Prepare the statement if it hasn’t been prepared, return the statement key.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1187 def prepare_statement(sql) sql_key = sql_key(sql) unless @statements.key? sql_key nextkey = @statements.next_key @connection.prepare nextkey, sql @statements[sql_key] = nextkey end @statements[sql_key] end
Executes a SELECT query and returns the results, performing any data type conversions that are required to be performed here instead of in RedshiftColumn
.
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1232 def select(sql, name = nil, binds = []) exec_query(sql, name, binds).to_a end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1236 def select_raw(sql, name = nil) res = execute(sql, name) results = result_as_array(res) fields = res.fields res.clear return fields, results end
Returns the statement identifier for the client side cache of statements
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1181 def sql_key(sql) "#{schema_search_path}-#{sql}" end
# File lib/active_record/connection_adapters/redshiftbulk_adapter.rb, line 1289 def table_definition TableDefinition.new(self) end