class PgSlice::CLI
Attributes
instance[RW]
Public Class Methods
exit_on_failure?()
click to toggle source
# File lib/pgslice/cli.rb, line 16 def self.exit_on_failure? ENV["PGSLICE_ENV"] != "test" end
new(*args)
click to toggle source
Calls superclass method
# File lib/pgslice/cli.rb, line 20 def initialize(*args) PgSlice::CLI.instance = self $stdout.sync = true $stderr.sync = true super end
Public Instance Methods
add_partitions(table)
click to toggle source
# File lib/pgslice/cli/add_partitions.rb, line 8 def add_partitions(table) original_table = create_table(table) table = options[:intermediate] ? original_table.intermediate_table : original_table trigger_name = original_table.trigger_name assert_table(table) future = options[:future] past = options[:past] tablespace = options[:tablespace] range = (-1 * past)..future period, field, cast, needs_comment, declarative, version = table.fetch_settings(original_table.trigger_name) unless period message = "No settings found: #{table}" message = "#{message}\nDid you mean to use --intermediate?" unless options[:intermediate] abort message end queries = [] if needs_comment queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_table(table)} is 'column:#{field},period:#{period},cast:#{cast}';" end # today = utc date today = round_date(Time.now.utc.to_date, period) schema_table = if !declarative table elsif options[:intermediate] original_table else table.partitions.last end # indexes automatically propagate in Postgres 11+ if version < 3 index_defs = schema_table.index_defs fk_defs = schema_table.foreign_keys else index_defs = [] fk_defs = [] end primary_key = schema_table.primary_key tablespace_str = tablespace.empty? ? "" : " TABLESPACE #{quote_ident(tablespace)}" added_partitions = [] range.each do |n| day = advance_date(today, period, n) partition = Table.new(original_table.schema, "#{original_table.name}_#{day.strftime(name_format(period))}") next if partition.exists? added_partitions << partition if declarative queries << <<-SQL CREATE TABLE #{quote_table(partition)} PARTITION OF #{quote_table(table)} FOR VALUES FROM (#{sql_date(day, cast, false)}) TO (#{sql_date(advance_date(day, period, 1), cast, false)})#{tablespace_str}; SQL else queries << <<-SQL CREATE TABLE #{quote_table(partition)} (CHECK (#{quote_ident(field)} >= #{sql_date(day, cast)} AND #{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)})) INHERITS (#{quote_table(table)})#{tablespace_str}; SQL end queries << "ALTER TABLE #{quote_table(partition)} ADD PRIMARY KEY (#{primary_key.map { |k| quote_ident(k) }.join(", ")});" if primary_key.any? index_defs.each do |index_def| queries << make_index_def(index_def, partition) end fk_defs.each do |fk_def| queries << make_fk_def(fk_def, partition) end end unless declarative # update trigger based on existing partitions current_defs = [] future_defs = [] past_defs = [] name_format = self.name_format(period) partitions = (table.partitions + added_partitions).uniq(&:name).sort_by(&:name) partitions.each do |partition| day = partition_date(partition, name_format) sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN INSERT INTO #{quote_table(partition)} VALUES (NEW.*);" if day.to_date < today past_defs << sql elsif advance_date(day, period, 1) < today current_defs << sql else future_defs << sql end end # order by current period, future periods asc, past periods desc trigger_defs = current_defs + future_defs + past_defs.reverse if trigger_defs.any? queries << <<-SQL CREATE OR REPLACE FUNCTION #{quote_ident(trigger_name)}() RETURNS trigger AS $$ BEGIN IF #{trigger_defs.join("\n ELSIF ")} ELSE RAISE EXCEPTION 'Date out of range. Ensure partitions are created.'; END IF; RETURN NULL; END; $$ LANGUAGE plpgsql; SQL end end run_queries(queries) if queries.any? end
analyze(table)
click to toggle source
# File lib/pgslice/cli/analyze.rb, line 5 def analyze(table) table = create_table(table) parent_table = options[:swapped] ? table : table.intermediate_table analyze_list = parent_table.partitions + [parent_table] run_queries_without_transaction(analyze_list.map { |t| "ANALYZE VERBOSE #{quote_table(t)};" }) end
fill(table)
click to toggle source
# File lib/pgslice/cli/fill.rb, line 11 def fill(table) table = create_table(table) source_table = create_table(options[:source_table]) if options[:source_table] dest_table = create_table(options[:dest_table]) if options[:dest_table] if options[:swapped] source_table ||= table.retired_table dest_table ||= table else source_table ||= table dest_table ||= table.intermediate_table end assert_table(source_table) assert_table(dest_table) period, field, cast, _, declarative, _ = dest_table.fetch_settings(table.trigger_name) if period name_format = self.name_format(period) partitions = dest_table.partitions if partitions.any? starting_time = partition_date(partitions.first, name_format) ending_time = advance_date(partition_date(partitions.last, name_format), period, 1) end end schema_table = period && declarative ? partitions.last : table primary_key = schema_table.primary_key[0] abort "No primary key" unless primary_key max_source_id = nil begin max_source_id = source_table.max_id(primary_key) rescue PG::UndefinedFunction abort "Only numeric primary keys are supported" end max_dest_id = if options[:start] options[:start] elsif options[:swapped] dest_table.max_id(primary_key, where: options[:where], below: max_source_id) else dest_table.max_id(primary_key, where: options[:where]) end if max_dest_id == 0 && !options[:swapped] min_source_id = source_table.min_id(primary_key, field, cast, starting_time, options[:where]) max_dest_id = min_source_id - 1 if min_source_id end starting_id = max_dest_id fields = source_table.columns.map { |c| quote_ident(c) }.join(", ") batch_size = options[:batch_size] i = 1 batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil if batch_count == 0 log_sql "/* nothing to fill */" end while starting_id < max_source_id where = "#{quote_ident(primary_key)} > #{starting_id} AND #{quote_ident(primary_key)} <= #{starting_id + batch_size}" if starting_time where << " AND #{quote_ident(field)} >= #{sql_date(starting_time, cast)} AND #{quote_ident(field)} < #{sql_date(ending_time, cast)}" end if options[:where] where << " AND #{options[:where]}" end query = <<-SQL /* #{i} of #{batch_count} */ INSERT INTO #{quote_table(dest_table)} (#{fields}) SELECT #{fields} FROM #{quote_table(source_table)} WHERE #{where} SQL run_query(query) starting_id += batch_size i += 1 if options[:sleep] && starting_id <= max_source_id sleep(options[:sleep]) end end end
prep(table, column=nil, period=nil)
click to toggle source
# File lib/pgslice/cli/prep.rb, line 6 def prep(table, column=nil, period=nil) table = create_table(table) intermediate_table = table.intermediate_table trigger_name = table.trigger_name unless options[:partition] abort "Usage: \"pgslice prep TABLE --no-partition\"" if column || period abort "Can't use --trigger-based and --no-partition" if options[:trigger_based] end assert_table(table) assert_no_table(intermediate_table) if options[:partition] abort "Usage: \"pgslice prep TABLE COLUMN PERIOD\"" if !(column && period) abort "Column not found: #{column}" unless table.columns.include?(column) abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym] end queries = [] # version summary # 1. trigger-based # 2. declarative, with indexes and foreign keys on child tables # 3. declarative, with indexes and foreign keys on parent table version = if options[:trigger_based] || server_version_num < 100000 1 elsif server_version_num < 110000 2 else 3 end declarative = version > 1 if declarative && options[:partition] queries << <<-SQL CREATE TABLE #{quote_table(intermediate_table)} (LIKE #{quote_table(table)} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS) PARTITION BY RANGE (#{quote_ident(column)}); SQL if version == 3 index_defs = table.index_defs index_defs.each do |index_def| queries << make_index_def(index_def, intermediate_table) end table.foreign_keys.each do |fk_def| queries << make_fk_def(fk_def, intermediate_table) end end # add comment cast = table.column_cast(column) queries << <<-SQL COMMENT ON TABLE #{quote_table(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast},version:#{version}'; SQL else queries << <<-SQL CREATE TABLE #{quote_table(intermediate_table)} (LIKE #{quote_table(table)} INCLUDING ALL); SQL table.foreign_keys.each do |fk_def| queries << make_fk_def(fk_def, intermediate_table) end end if options[:partition] && !declarative queries << <<-SQL CREATE FUNCTION #{quote_ident(trigger_name)}() RETURNS trigger AS $$ BEGIN RAISE EXCEPTION 'Create partitions first.'; END; $$ LANGUAGE plpgsql; SQL queries << <<-SQL CREATE TRIGGER #{quote_ident(trigger_name)} BEFORE INSERT ON #{quote_table(intermediate_table)} FOR EACH ROW EXECUTE PROCEDURE #{quote_ident(trigger_name)}(); SQL cast = table.column_cast(column) queries << <<-SQL COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_table(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}'; SQL end run_queries(queries) end
swap(table)
click to toggle source
# File lib/pgslice/cli/swap.rb, line 5 def swap(table) table = create_table(table) intermediate_table = table.intermediate_table retired_table = table.retired_table assert_table(table) assert_table(intermediate_table) assert_no_table(retired_table) queries = [ "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(retired_table)};", "ALTER TABLE #{quote_table(intermediate_table)} RENAME TO #{quote_no_schema(table)};" ] table.sequences.each do |sequence| queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_schema"])}.#{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_table(table)}.#{quote_ident(sequence["related_column"])};" end queries.unshift("SET LOCAL lock_timeout = '#{options[:lock_timeout]}';") if server_version_num >= 90300 run_queries(queries) end
unprep(table)
click to toggle source
# File lib/pgslice/cli/unprep.rb, line 4 def unprep(table) table = create_table(table) intermediate_table = table.intermediate_table trigger_name = table.trigger_name assert_table(intermediate_table) queries = [ "DROP TABLE #{quote_table(intermediate_table)} CASCADE;", "DROP FUNCTION IF EXISTS #{quote_ident(trigger_name)}();" ] run_queries(queries) end
unswap(table)
click to toggle source
# File lib/pgslice/cli/unswap.rb, line 4 def unswap(table) table = create_table(table) intermediate_table = table.intermediate_table retired_table = table.retired_table assert_table(table) assert_table(retired_table) assert_no_table(intermediate_table) queries = [ "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(intermediate_table)};", "ALTER TABLE #{quote_table(retired_table)} RENAME TO #{quote_no_schema(table)};" ] table.sequences.each do |sequence| queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_schema"])}.#{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_table(table)}.#{quote_ident(sequence["related_column"])};" end run_queries(queries) end
version()
click to toggle source
# File lib/pgslice/cli.rb, line 28 def version log("pgslice #{PgSlice::VERSION}") end