module PgVersions
TODO: prepared statements?
Constants
- VERSION
Public Class Methods
bump(*channels, connection: nil)
click to toggle source
TODO: ensure this is called only once per transaction, or that all bumps occur in the same order in all transactions, to avoid deadlocks
# File lib/pg_versions/pg_versions.rb, line 77 def self.bump(*channels, connection: nil) PgVersions.with_connection(connection) { |pg_connection| channels = [channels].flatten.sort return {} if channels.size == 0 quoted_channels = channels.map.with_index { |channel, i| "(#{i},'#{pg_connection.escape_string(channel)}')" }.join(", ") # table-wide share lock is there to mutually exclude table cleaner # clock_timestamp() - this has to be a timestamp after table lock got acquired pg_connection.exec(" LOCK TABLE pg_versions IN ACCESS SHARE MODE; WITH to_bump(i, channel) AS (VALUES #{quoted_channels}) , current_instant(ts) AS (VALUES (clock_timestamp())) , updated AS ( INSERT INTO pg_versions(channel, instant, counter) SELECT to_bump.channel, (SELECT ts FROM current_instant), 0 FROM to_bump ON CONFLICT (channel) DO UPDATE SET instant = GREATEST(pg_versions.instant, EXCLUDED.instant), counter = CASE WHEN pg_versions.instant < EXCLUDED.instant THEN 0 ELSE pg_versions.counter + 1 END RETURNING channel, instant, pg_versions.counter ) SELECT DISTINCT i , #{timestamp_to_integers('updated.instant')} || ',' || updated.counter::text AS version , pg_notify(updated.channel::text, #{timestamp_to_integers('updated.instant')} || ',' || updated.counter::text)::text FROM to_bump JOIN updated ON to_bump.channel = updated.channel; ").map { |row| [channels[Integer(row["i"])], string_to_version(row["version"])] }.to_h } end
create_table(connection=nil)
click to toggle source
# File lib/pg_versions/pg_versions.rb, line 60 def self.create_table(connection=nil) PgVersions.with_connection(connection) { |pg_connection| open(File.dirname(__FILE__)+"/../../create-table.sql") { |sql_file| pg_connection.exec sql_file.read } } end
drop_table(connection=nil)
click to toggle source
# File lib/pg_versions/pg_versions.rb, line 68 def self.drop_table(connection=nil) PgVersions.with_connection(connection) { |pg_connection| open(File.dirname(__FILE__)+"/../../drop-table.sql") { |sql_file| pg_connection.exec sql_file.read } } end
read(*channels, connection: nil)
click to toggle source
# File lib/pg_versions/pg_versions.rb, line 109 def self.read(*channels, connection: nil) PgVersions.with_connection(connection) { |pg_connection| channels = [channels].flatten.sort return {} if channels.size == 0 versions = {} quoted_channels = channels.map.with_index { |channel, i| "(#{i},'#{pg_connection.escape_string(channel)}')" }.join(", ") not_found_channels = pg_connection.exec(" LOCK TABLE pg_versions IN ACCESS SHARE MODE; WITH channels(i, channel) AS (VALUES #{quoted_channels}) SELECT i , #{timestamp_to_integers('instant')} || ',' || counter AS version FROM channels JOIN pg_versions ON pg_versions.channel = channels.channel ORDER BY i DESC; ").each { |row| versions[channels.delete_at(Integer(row["i"]))] = string_to_version(row["version"]) } #TODO: bump in the same query instead of calling bump versions.merge!(self.bump(channels, connection: pg_connection)) if channels.size > 0 versions } end
string_to_version(version_str)
click to toggle source
# File lib/pg_versions/pg_versions.rb, line 55 def self.string_to_version(version_str) version_str.split(",").map { |str| Integer(str) } end
timestamp_to_integers(input)
click to toggle source
# File lib/pg_versions/pg_versions.rb, line 37 def self.timestamp_to_integers(input) "to_char(%s, 'YYYYMMDD')::integer || ',' || to_char(%s, 'HH24MISS')::integer || ',' || to_char(%s, 'US')::integer"%[input, input, input] end
with_connection(pg_connection) { |pg_connection| ... }
click to toggle source
# File lib/pg_versions/pg_versions.rb, line 42 def self.with_connection(pg_connection) if pg_connection yield pg_connection elsif defined? ActiveRecord ActiveRecord::Base.connection_pool.with_connection { |ar_connection| yield ar_connection.instance_variable_get(:@connection) } else raise ConnectionAcquisitionFailedError, "Missing connection. Either pass pg connection object or import ActiveRecord." end end