class PgVersions::Connection::Subscription

Public Class Methods

new(connection) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 219
def initialize(connection)
        @connection = connection
        @notifications = Queue.new
        @already_known_versions = Hash.new { |h,k| h[k] = [] }
        @channels = Hash.new(0)
end

Public Instance Methods

bump(*channels, notify: true) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 277
def bump(*channels, notify: true)
        channels = @channels.keys  if channels.size == 0
        versions = @connection.actor_call { |pg_connection, subscribers|
                PgVersions.bump(channels, connection: pg_connection)
        }
        update_already_known_versions(versions)  if not notify
        versions
end
drop() click to toggle source
# File lib/pg_versions/pg_versions.rb, line 305
def drop
        @notifications << [nil, nil]
        unsubscribe(@channels.keys)
end
notify(channel, payload) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 300
def notify(channel, payload)
        @notifications << [channel, payload]
end
read(*channels, notify: true) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 267
def read(*channels, notify: true)
        channels = @channels.keys  if channels.size == 0
        versions = @connection.actor_call { |pg_connection, subscribers|
                PgVersions.read(channels, connection: pg_connection)
        }
        update_already_known_versions(versions)  if not notify
        versions
end
subscribe(channels, known: {}) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 227
def subscribe(channels, known: {})
        update_already_known_versions(known)
        channels = [channels].flatten
        channels.select! { |channel|
                (@channels[channel] += 1) == 1
        }
        if channels.size > 0
                @connection.actor_call { |pg_connection, subscribers|
                        channels.each { |channel|
                                subscribers[channel] << self
                                pg_connection.exec("LISTEN #{PG::Connection.quote_ident(channel)}")  if subscribers[channel].size == 1
                        }
                        PgVersions.read(channels, connection: pg_connection).each_pair { |channel, version|
                                notify(channel, version)
                        }
                }
        end
end
unsubscribe(*channels) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 247
def unsubscribe(*channels)
        channels = [channels].flatten
        channels.select! { |channel|
                @channels[channel] -= 1
                raise "Trying to unsubscribe from channel (%p) more times than it was subscribed to"%[channel]  if @channels[channel] < 0
                @channels.delete(channel)  if @channels[channel] == 0
                not @channels.has_key?(channel)
        }
        @connection.actor_call { |pg_connection, subscribers|
                channels.each { |channel|
                        subscribers[channel].delete(self)
                        if subscribers[channel].size == 0
                                pg_connection.exec("UNLISTEN #{PG::Connection.quote_ident(channel)}")
                                subscribers.delete(channel)
                        end
                }
        }
end
update_already_known_versions(new_already_known_versions) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 311
def update_already_known_versions(new_already_known_versions)
        new_already_known_versions.each { |channel, version|
                @already_known_versions[channel] = version  if (version <=> @already_known_versions[channel]) == 1
        }
end
wait(new_already_known_versions = {}) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 287
def wait(new_already_known_versions = {})
        update_already_known_versions(new_already_known_versions)
        loop {
                channel, version = @notifications.shift
                return nil  if not channel #termination
                if (@already_known_versions[channel] <=> version) == -1
                        @already_known_versions[channel] = version
                        return Notification.new(channel, version)
                end
        }
end