class PgVersions::Connection

Public Class Methods

new(connection=nil) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 147
def initialize(connection=nil)
        @actor_commands = Queue.new
        actor_notify_r, @actor_notify_w = IO.pipe

        connection_error_queue = Queue.new
        @actor = Thread.new {
                begin
                        PgVersions.with_connection(connection) { |pg_connection|
                                connection_error_queue << false
                                subscribers = Hash.new { |h,k| h[k] = Set.new }
                                loop {
                                        #TODO: handle errors
                                        reads,_writes,_errors = IO::select([pg_connection.socket_io, actor_notify_r])
                                        
                                        if reads.include?(pg_connection.socket_io)
                                                pg_connection.consume_input
                                        end

                                        if reads.include?(actor_notify_r)
                                                @actor_commands.shift.call(pg_connection, subscribers)
                                                actor_notify_r.read(1)
                                        end

                                        while notification = pg_connection.notifies
                                                channel, payload = notification[:relname], notification[:extra]
                                                subscribers[channel].each { |subscriber|
                                                        subscriber.notify(channel, PgVersions.string_to_version(payload))
                                                }
                                        end
                                }
                        }
                rescue ConnectionAcquisitionFailedError => e
                        connection_error_queue << e
                end
        }
        (connection_error = connection_error_queue.shift) and raise connection_error
end

Public Instance Methods

actor_call(&block) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 186
def actor_call(&block)
        done = Queue.new
        @actor_commands << proc { |pg_connection, subscribers|
                done << block.call(pg_connection, subscribers)
        } 
        @actor_notify_w.write('!')
        done.shift
end
bump(*channels) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 196
def bump(*channels)
        actor_call { |pg_connection, _subscribers|
                PgVersions.bump(channels, connection: pg_connection)
        }
end
read(*channels) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 203
def read(*channels)
        actor_call { |pg_connection, _subscribers|
                PgVersions.read(channels, connection: pg_connection)
        }
end
subscribe(*channels, known: {}) click to toggle source
# File lib/pg_versions/pg_versions.rb, line 210
def subscribe(*channels, known: {})
        subscription = Subscription.new(self)
        subscription.subscribe([channels].flatten, known: known)
        subscription
end