module PgVersions

TODO: prepared statements?

Constants

VERSION

Public Class Methods

bump(*channels, connection: nil) click to toggle source

TODO: ensure this is called only once per transaction, or that all bumps occur in the same order in all transactions, to avoid deadlocks

# File lib/pg_versions/pg_versions.rb, line 77
def self.bump(*channels, connection: nil)
        PgVersions.with_connection(connection) { |pg_connection|
                channels = [channels].flatten.sort
                return {} if channels.size == 0
                quoted_channels = channels.map.with_index { |channel, i| "(#{i},'#{pg_connection.escape_string(channel)}')" }.join(", ")
                # table-wide share lock is there to mutually exclude table cleaner
                # clock_timestamp() - this has to be a timestamp after table lock got acquired
                pg_connection.exec("
                        LOCK TABLE pg_versions IN ACCESS SHARE MODE;
                        WITH 
                                to_bump(i, channel) AS (VALUES #{quoted_channels})
                                , current_instant(ts) AS (VALUES (clock_timestamp()))
                                , updated AS (
                                                INSERT INTO pg_versions(channel, instant, counter)
                                                SELECT to_bump.channel, (SELECT ts FROM current_instant), 0 FROM to_bump
                                                ON CONFLICT (channel) DO UPDATE SET
                                                        instant = GREATEST(pg_versions.instant, EXCLUDED.instant),
                                                        counter = CASE WHEN pg_versions.instant < EXCLUDED.instant THEN 0 ELSE pg_versions.counter + 1 END
                                                RETURNING channel, instant, pg_versions.counter
                                                )
                        SELECT DISTINCT
                                i
                                , #{timestamp_to_integers('updated.instant')} || ',' || updated.counter::text AS version
                                , pg_notify(updated.channel::text, #{timestamp_to_integers('updated.instant')} || ',' || updated.counter::text)::text
                        FROM
                                to_bump
                                JOIN updated ON to_bump.channel = updated.channel;
                ").map { |row| [channels[Integer(row["i"])], string_to_version(row["version"])] }.to_h
        }
end
create_table(connection=nil) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 60
def self.create_table(connection=nil)
        PgVersions.with_connection(connection) { |pg_connection|
                open(File.dirname(__FILE__)+"/../../create-table.sql") { |sql_file|
                        pg_connection.exec sql_file.read
                }
        }
end
drop_table(connection=nil) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 68
def self.drop_table(connection=nil)
        PgVersions.with_connection(connection) { |pg_connection|
                open(File.dirname(__FILE__)+"/../../drop-table.sql") { |sql_file|
                        pg_connection.exec sql_file.read
                }
        }
end
read(*channels, connection: nil) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 109
def self.read(*channels, connection: nil)
        PgVersions.with_connection(connection) { |pg_connection|
                channels = [channels].flatten.sort
                return {} if channels.size == 0
                versions = {}
                quoted_channels = channels.map.with_index { |channel, i| "(#{i},'#{pg_connection.escape_string(channel)}')" }.join(", ")
                not_found_channels = pg_connection.exec("
                        LOCK TABLE pg_versions IN ACCESS SHARE MODE;
                        WITH
                                channels(i, channel) AS (VALUES #{quoted_channels})
                        SELECT
                                i
                                , #{timestamp_to_integers('instant')} || ',' || counter AS version
                        FROM
                                channels
                                JOIN pg_versions ON pg_versions.channel = channels.channel
                        ORDER BY
                                i DESC;
                ").each { |row|
                        versions[channels.delete_at(Integer(row["i"]))] = string_to_version(row["version"])
                }
                #TODO: bump in the same query instead of calling bump
                versions.merge!(self.bump(channels, connection: pg_connection))  if channels.size > 0
                versions
        }
end
string_to_version(version_str) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 55
def self.string_to_version(version_str) 
        version_str.split(",").map { |str| Integer(str) }
end
timestamp_to_integers(input) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 37
def self.timestamp_to_integers(input)
        "to_char(%s, 'YYYYMMDD')::integer || ',' || to_char(%s, 'HH24MISS')::integer || ',' || to_char(%s, 'US')::integer"%[input, input, input]
end
with_connection(pg_connection) { |pg_connection| ... } click to toggle source
# File lib/pg_versions/pg_versions.rb, line 42
def self.with_connection(pg_connection)
        if pg_connection
                yield pg_connection
        elsif defined? ActiveRecord
                ActiveRecord::Base.connection_pool.with_connection { |ar_connection|
                        yield ar_connection.instance_variable_get(:@connection)
                }
        else
                raise ConnectionAcquisitionFailedError, "Missing connection. Either pass pg connection object or import ActiveRecord."
        end
end