class PG::LogicalReplication::Client

Attributes

command_builder[R]
connection[R]

Public Class Methods

new(connection) click to toggle source

@param connection [PG::Connection] Database Connection

# File lib/pg/logical_replication/client.rb, line 18
def initialize(connection)
  @connection      = connection
  @command_builder = PG::LogicalReplication::CommandBuilder.new(connection)
end
type_map_for_queries(connection) click to toggle source
# File lib/pg/logical_replication/client.rb, line 9
def self.type_map_for_queries(connection)
  @type_map_for_queries ||= PG::BasicTypeMapForQueries.new(connection)
end
type_map_for_results(connection) click to toggle source
# File lib/pg/logical_replication/client.rb, line 13
def self.type_map_for_results(connection)
  @type_map_for_results ||= PG::BasicTypeMapForResults.new(connection)
end

Public Instance Methods

add_tables_to_publication(name, tables) click to toggle source

Adds tables to a publication

@param name [String] publication name @param tables [Array<String>] table names to add

# File lib/pg/logical_replication/client.rb, line 284
def add_tables_to_publication(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} ADD TABLE #{safe_list(tables)}")
end
alter_publication_options(name, options) click to toggle source

Alters parameters originally set by CREATE PUBLICATION

@param name [String] publication name @param options [Hash] parameters to set

# File lib/pg/logical_replication/client.rb, line 308
def alter_publication_options(name, options)
  base_command = "ALTER PUBLICATION #{connection.quote_ident(name)}"
  typed_exec(command_builder.command_with_options(base_command, "SET", options))
end
alter_subscription_options(name, options) click to toggle source

Alters parameters originally set by CREATE SUBSCRIPTION

@param name [String] subscription name @param options [Hash] parameters to set

# File lib/pg/logical_replication/client.rb, line 146
def alter_subscription_options(name, options)
  base_command = "ALTER SUBSCRIPTION #{connection.quote_ident(name)}"
  typed_exec(command_builder.command_with_options(base_command, "SET", options))
end
create_logical_replication_slot(name) click to toggle source

Creates a logical replication slot

@param name [String] logical replication slot name

# File lib/pg/logical_replication/client.rb, line 84
def create_logical_replication_slot(name)
  typed_exec("SELECT pg_create_logical_replication_slot(#{connection.escape_literal(name)}, 'pgoutput')")
end
create_publication(name, all_tables = false, tables = [], options = {}) click to toggle source

Creates a new publication

@param name [String] publication name @param all_tables [Boolean] replicate changes for all tables, including ones created in the future @param tables [Array<String>] tables to be added to the publication, ignored if all_tables is true @param options [Hash] optional parameters

# File lib/pg/logical_replication/client.rb, line 270
def create_publication(name, all_tables = false, tables = [], options = {})
  base_command = "CREATE PUBLICATION #{connection.quote_ident(name)}"
  if all_tables
    base_command << " FOR ALL TABLES"
  elsif !tables.empty?
    base_command << " FOR TABLE #{safe_list(tables)}"
  end
  typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end
create_subscription(name, conninfo_hash, publications, options = {}) click to toggle source

Creates a subscription to a publisher node

@param name [String] subscription name @param conninfo_hash [Hash] publisher node connection info @param publications [Array<String>] publication names to subscribe to @param options [Hash] optional parameters for CREATE SUBSCRIPTION

# File lib/pg/logical_replication/client.rb, line 61
      def create_subscription(name, conninfo_hash, publications, options = {})
        options[:slot_name] = name if !options.key?(:slot_name) && !options.key?("slot_name") && (options['create_slot'] == false || options[:create_slot] == false)

        connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
        base_command = <<-SQL
          CREATE SUBSCRIPTION #{connection.quote_ident(name)}
                 CONNECTION '#{connection_string}'
                 PUBLICATION #{safe_list(publications)}
        SQL
        typed_exec(command_builder.command_with_options(base_command, "WITH", options))
      end
disable_subscription(name) click to toggle source

Disables the running subscription

@param name [String] subscription name

# File lib/pg/logical_replication/client.rb, line 138
def disable_subscription(name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} DISABLE")
end
drop_publication(name, ifexists = false) click to toggle source

Remove a publication

@param name [String] publication name @param ifexists [Boolean] if true an error is not thrown when the publication does not exist

# File lib/pg/logical_replication/client.rb, line 333
def drop_publication(name, ifexists = false)
  typed_exec("DROP PUBLICATION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end
drop_replication_slot(name) click to toggle source

Drops the physical or logical replication slot. Note, you must be on the same database a logical slot was created.

@param name [String] replication slot name

# File lib/pg/logical_replication/client.rb, line 91
def drop_replication_slot(name)
  typed_exec("SELECT pg_drop_replication_slot(#{connection.escape_literal(name)})")
end
drop_subscription(name, ifexists = false) click to toggle source

Disconnects the subscription and removes it

@param name [String] subscription name @param ifexists [Boolean] if true an error is not thrown when the subscription does not exist

# File lib/pg/logical_replication/client.rb, line 77
def drop_subscription(name, ifexists = false)
  typed_exec("DROP SUBSCRIPTION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end
enable_subscription(name) click to toggle source

Enables the previously disabled subscription

@param name [String] subscription name

# File lib/pg/logical_replication/client.rb, line 131
def enable_subscription(name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} ENABLE")
end
lag_bytes() click to toggle source

Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node

@return [Array<Hash<String,String>>] List of returned lag and application names,

one for each replication process
# File lib/pg/logical_replication/client.rb, line 28
      def lag_bytes
        typed_exec(<<-SQL).to_a
          SELECT
            pg_wal_lsn_diff(pg_current_wal_insert_lsn(), flush_lsn) AS lag_bytes,
            application_name
          FROM
            pg_stat_replication
        SQL
      end
publications() click to toggle source

Lists the current publications

@return [Array<String>] publication names

# File lib/pg/logical_replication/client.rb, line 245
      def publications
        typed_exec(<<-SQL)
          SELECT
            pubname::TEXT AS name,
            usename::TEXT AS owner,
            puballtables,
            pubinsert,
            pubupdate,
            pubdelete
          FROM
            pg_publication
            JOIN pg_user ON pubowner = usesysid
        SQL
      end
publishes?(publication_name) click to toggle source
# File lib/pg/logical_replication/client.rb, line 260
def publishes?(publication_name)
  publications.any? { |p| p["name"] == publication_name }
end
remove_tables_from_publication(name, tables) click to toggle source

Removes tables from a publication

@param name [String] publication name @param tables [Array<String>] table names to remove

# File lib/pg/logical_replication/client.rb, line 300
def remove_tables_from_publication(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} DROP TABLE #{safe_list(tables)}")
end
rename_publication(name, new_name) click to toggle source

Renames a publication

@param name [String] current publication name @param new_name [String] new publication name

# File lib/pg/logical_replication/client.rb, line 325
def rename_publication(name, new_name)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end
rename_subscription(name, new_name) click to toggle source

Renames the subscription

@param name [String] current subscription name @param new_name [String] new subscription name

# File lib/pg/logical_replication/client.rb, line 163
def rename_subscription(name, new_name)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end
replication_slots() click to toggle source

Lists the current replication slots

@return [Array<String>] replication slots

# File lib/pg/logical_replication/client.rb, line 229
      def replication_slots
        typed_exec(<<-SQL)
          SELECT
            slot_name::TEXT,
            plugin::TEXT,
            slot_type::TEXT,
            database::TEXT,
            temporary,
            active
          FROM pg_replication_slots
        SQL
      end
set_publication_owner(name, owner) click to toggle source

Sets the owner of a publication

@param name [String] publication name @param owner [String] new owner user name

# File lib/pg/logical_replication/client.rb, line 317
def set_publication_owner(name, owner)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end
set_publication_tables(name, tables) click to toggle source

Sets the tables included in a publication

@param name [String] publication name @param tables [Array<String>] table names

# File lib/pg/logical_replication/client.rb, line 292
def set_publication_tables(name, tables)
  typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} SET TABLE #{safe_list(tables)}")
end
set_subscription_conninfo(name, conninfo_hash) click to toggle source

Updates a subscription connection string

@param name [String] subscription name @param conninfo_hash [Hash] new external connection hash to the publisher node

# File lib/pg/logical_replication/client.rb, line 99
def set_subscription_conninfo(name, conninfo_hash)
  connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}'")
end
set_subscription_owner(name, owner) click to toggle source

Sets the owner of the subscription

@param name [String] subscription name @param owner [String] new owner user name

# File lib/pg/logical_replication/client.rb, line 155
def set_subscription_owner(name, owner)
  typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end
set_subscription_publications(name, publications, options = {}) click to toggle source

Changes list of subscribed publications

@param name [String] subscription name @param publications [Array<String>] publication names to subscribe to @param options [Hash] optional parameters

# File lib/pg/logical_replication/client.rb, line 109
      def set_subscription_publications(name, publications, options = {})
        base_command = <<-SQL
          ALTER SUBSCRIPTION #{connection.quote_ident(name)}
          SET PUBLICATION #{safe_list(publications)}
        SQL
        typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
      end
subscriber?(dbname = nil) click to toggle source

Returns if this database is subscribing to any publications

@return [Boolean] true if there are any subscriptions, false otherwise

# File lib/pg/logical_replication/client.rb, line 222
def subscriber?(dbname = nil)
  subscriptions(dbname).any?
end
subscriptions(dbname = nil) click to toggle source

Shows status and basic information about all subscriptions

@return [Array<Hash>] a list of subscriptions

keys:
  subscription_name
  database_name
  owner
  worker_count
  enabled
  subscription_dsn
  slot_name
  publications
  remote_replication_lsn
  local_replication_lsn
# File lib/pg/logical_replication/client.rb, line 181
      def subscriptions(dbname = nil)
        subscriptions = typed_exec(<<-SQL).to_a
          SELECT
            sub.subname::TEXT         AS subscription_name,
            pg_database.datname::TEXT AS database_name,
            pg_user.usename::TEXT     AS owner,
            COUNT(sub_stat.pid)       AS worker_count,
            sub.subenabled            AS enabled,
            sub.subconninfo           AS subscription_dsn,
            sub.subslotname::TEXT     AS slot_name,
            sub.subpublications       AS publications,
            stat.remote_lsn::TEXT     AS remote_replication_lsn,
            stat.local_lsn::TEXT      AS local_replication_lsn
          FROM
            pg_subscription AS sub
            JOIN pg_user
              ON sub.subowner = usesysid
            JOIN pg_database
              ON sub.subdbid = pg_database.oid
            LEFT JOIN pg_replication_origin_status stat
              ON concat('pg_', sub.oid) = stat.external_id
            LEFT JOIN pg_stat_subscription sub_stat
              ON sub_stat.subid = sub.oid AND sub_stat.pid IS NOT NULL
          GROUP BY
            sub.subname,
            pg_database.datname,
            pg_user.usename,
            sub.subenabled,
            sub.subconninfo,
            sub.subslotname,
            sub.subpublications,
            stat.remote_lsn,
            stat.local_lsn
        SQL

        dbname ? subscriptions.select { |s| s["database_name"] == dbname } : subscriptions
      end
sync_subscription(name, options = {}) click to toggle source

Fetch missing table information from publisher

@param name [String] subscription name @param options [Hash] optional parameters

# File lib/pg/logical_replication/client.rb, line 121
      def sync_subscription(name, options = {})
        base_command = <<-SQL
          ALTER SUBSCRIPTION #{connection.quote_ident(name)} REFRESH PUBLICATION
        SQL
        typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
      end
tables_in_publication(name) click to toggle source

Lists the tables currently in the publication

@param set_name [String] publication name @return [Array<String>] table names

# File lib/pg/logical_replication/client.rb, line 341
      def tables_in_publication(name)
        typed_exec(<<-SQL, name).values.flatten
          SELECT tablename::TEXT
          FROM pg_publication_tables
          WHERE pubname = $1
        SQL
      end
wal_retained_bytes() click to toggle source

Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node

@return [Array<Hash<String,String>>] List of returned WAL bytes and replication slot names,

one for each replication process
# File lib/pg/logical_replication/client.rb, line 43
      def wal_retained_bytes
        typed_exec(<<-SQL).to_a
          SELECT
            pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn) AS retained_bytes,
            slot_name::TEXT
          FROM
            pg_replication_slots
          WHERE
            plugin = 'pgoutput'
        SQL
      end

Private Instance Methods

safe_list(list) click to toggle source
# File lib/pg/logical_replication/client.rb, line 351
def safe_list(list)
  list.map { |e| connection.quote_ident(e) }.join(", ")
end
typed_exec(sql, *params) click to toggle source
# File lib/pg/logical_replication/client.rb, line 355
def typed_exec(sql, *params)
  result = connection.async_exec(sql, params, nil, self.class.type_map_for_queries(connection))
  result.map_types!(self.class.type_map_for_results(connection))
end