class PgVersions::Connection
Public Class Methods
new(connection=nil)
click to toggle source
# File lib/pg_versions/pg_versions.rb, line 147 def initialize(connection=nil) @actor_commands = Queue.new actor_notify_r, @actor_notify_w = IO.pipe connection_error_queue = Queue.new @actor = Thread.new { begin PgVersions.with_connection(connection) { |pg_connection| connection_error_queue << false subscribers = Hash.new { |h,k| h[k] = Set.new } loop { #TODO: handle errors reads,_writes,_errors = IO::select([pg_connection.socket_io, actor_notify_r]) if reads.include?(pg_connection.socket_io) pg_connection.consume_input end if reads.include?(actor_notify_r) @actor_commands.shift.call(pg_connection, subscribers) actor_notify_r.read(1) end while notification = pg_connection.notifies channel, payload = notification[:relname], notification[:extra] subscribers[channel].each { |subscriber| subscriber.notify(channel, PgVersions.string_to_version(payload)) } end } } rescue ConnectionAcquisitionFailedError => e connection_error_queue << e end } (connection_error = connection_error_queue.shift) and raise connection_error end
Public Instance Methods
actor_call(&block)
click to toggle source
# File lib/pg_versions/pg_versions.rb, line 186 def actor_call(&block) done = Queue.new @actor_commands << proc { |pg_connection, subscribers| done << block.call(pg_connection, subscribers) } @actor_notify_w.write('!') done.shift end
bump(*channels)
click to toggle source
# File lib/pg_versions/pg_versions.rb, line 196 def bump(*channels) actor_call { |pg_connection, _subscribers| PgVersions.bump(channels, connection: pg_connection) } end
read(*channels)
click to toggle source
# File lib/pg_versions/pg_versions.rb, line 203 def read(*channels) actor_call { |pg_connection, _subscribers| PgVersions.read(channels, connection: pg_connection) } end
subscribe(*channels, known: {})
click to toggle source
# File lib/pg_versions/pg_versions.rb, line 210 def subscribe(*channels, known: {}) subscription = Subscription.new(self) subscription.subscribe([channels].flatten, known: known) subscription end