module PG::EM::Client::Watcher

This module is used as a handler to ::EM.watch connection socket and it extracts query results in a non-blocking manner.

Author

Rafal Michalski

Public Class Methods

new(client) click to toggle source
# File lib/pg/em/client/watcher.rb, line 11
def initialize(client)
  @client = client
  @is_connected = true
  @one_result_mode = false
  @deferrable = nil
  @notify_deferrable = nil
  @timer = nil
  @notify_timer = nil
end

Public Instance Methods

cancel_notify_timer() click to toggle source
# File lib/pg/em/client/watcher.rb, line 79
def cancel_notify_timer
  if @notify_timer
    @notify_timer.cancel
    @notify_timer = nil
  end
end
cancel_timer() click to toggle source
# File lib/pg/em/client/watcher.rb, line 86
def cancel_timer
  if @timer
    @timer.cancel
    @timer = nil
  end
end
check_notify() click to toggle source
# File lib/pg/em/client/watcher.rb, line 102
def check_notify
  if notify_hash = @client.notifies
    cancel_notify_timer
    succeed_notify notify_hash
  end
end
fetch_results() click to toggle source

Carefully extract results without blocking the EventMachine reactor.

# File lib/pg/em/client/watcher.rb, line 111
def fetch_results
  result = false
  until @client.is_busy
    single_result = @client.blocking_get_result
    if one_result_mode?
      result = single_result
      break
    elsif single_result.nil?
      if result = @last_result
        result.check
      end
      break
    end
    @last_result.clear if @last_result
    @last_result = single_result
  end
rescue Exception => e
  handle_error e
else
  if result == false
    @readable_timestamp = Time.now if @timer
  else
    cancel_timer
    self.notify_readable = false unless @notify_deferrable
    df = @deferrable
    @deferrable = @send_proc = nil
    df.succeed result
  end
end
notify_readable() click to toggle source
# File lib/pg/em/client/watcher.rb, line 93
def notify_readable
  @client.consume_input
rescue Exception => e
  handle_error e
else
  fetch_results if @deferrable
  check_notify if @notify_deferrable
end
one_result_mode?() click to toggle source
# File lib/pg/em/client/watcher.rb, line 25
def one_result_mode?
  @one_result_mode
end
setup_timer(timeout, adjustment = 0) click to toggle source
# File lib/pg/em/client/watcher.rb, line 58
def setup_timer(timeout, adjustment = 0)
  @timer = ::EM::Timer.new(timeout - adjustment) do
    if (last_interval = Time.now - @readable_timestamp) >= timeout
      @timer = nil
      cancel_notify_timer
      self.notify_readable = false
      @client.async_command_aborted = true
      @send_proc = nil
      begin
        @client.raise_error ConnectionBad, "query timeout expired (async)"
      rescue Exception => e
        fail_result e
        # notify should also fail: query timeout is like connection error
        fail_notify e
      end
    else
      setup_timer timeout, last_interval
    end
  end
end
unbind() click to toggle source
# File lib/pg/em/client/watcher.rb, line 141
def unbind
  @is_connected = false
  cancel_timer
  cancel_notify_timer
  if @deferrable || @notify_deferrable
    @client.raise_error ConnectionBad, "connection reset"
  end
rescue Exception => e
  fail_result e
  fail_notify e
end
watch_notify(deferrable, timeout = nil) click to toggle source
# File lib/pg/em/client/watcher.rb, line 43
def watch_notify(deferrable, timeout = nil)
  notify_df = @notify_deferrable
  @notify_deferrable = deferrable
  cancel_notify_timer
  self.notify_readable = true unless notify_readable?
  if timeout
    @notify_timer = ::EM::Timer.new(timeout) do
      @notify_timer = nil
      succeed_notify
    end
  end
  notify_df.fail nil if notify_df
  check_notify
end
watch_results(deferrable, send_proc = nil, one_result_mode = false) click to toggle source
# File lib/pg/em/client/watcher.rb, line 29
def watch_results(deferrable, send_proc = nil, one_result_mode = false)
  @one_result_mode = one_result_mode
  @last_result = nil
  @deferrable = deferrable
  @send_proc = send_proc
  cancel_timer
  self.notify_readable = true unless notify_readable?
  if (timeout = @client.query_timeout) > 0
    @readable_timestamp = Time.now
    setup_timer timeout
  end
  fetch_results
end
watching?() click to toggle source
# File lib/pg/em/client/watcher.rb, line 21
def watching?
  @is_connected
end

Private Instance Methods

fail_notify(e) click to toggle source
# File lib/pg/em/client/watcher.rb, line 168
def fail_notify(e)
  df = @notify_deferrable
  @notify_deferrable = nil
  df.fail e if df
end
fail_result(e) click to toggle source
# File lib/pg/em/client/watcher.rb, line 155
def fail_result(e)
  df = @deferrable
  @deferrable = nil
  df.fail e if df
end
handle_error(e) click to toggle source
# File lib/pg/em/client/watcher.rb, line 174
def handle_error(e)
  cancel_timer
  send_proc = @send_proc
  @send_proc = nil
  df = @deferrable || FeaturedDeferrable.new
  # prevent unbind error on auto re-connect
  @deferrable = nil
  notify_df = @notify_deferrable
  self.notify_readable = false unless notify_df
  if e.is_a?(PG::Error)
    @client.async_autoreconnect!(df, e, send_proc) do
      # there was a connection error so stop any remaining activity
      if notify_df
        @notify_deferrable = nil
        cancel_notify_timer
        self.notify_readable = false
        # fail notify_df after deferrable completes
        # handler might setup listen again then immediately
        df.completion { notify_df.fail e }
      end
    end
  else
    df.fail e
  end
end
succeed_notify(notify_hash = nil) click to toggle source
# File lib/pg/em/client/watcher.rb, line 161
def succeed_notify(notify_hash = nil)
  self.notify_readable = false unless @deferrable
  df = @notify_deferrable
  @notify_deferrable = nil
  df.succeed notify_hash
end