class Vx::Consumer::Session

Attributes

conn[R]

Public Instance Methods

allocate_pub_channel() { || ... } click to toggle source
# File lib/vx/consumer/session.rb, line 112
def allocate_pub_channel
  assert_connection_is_open

  key = :vx_consumer_session_pub_channel

  if Thread.current[key]
    yield
  else
    ch = conn.create_channel
    assign_error_handlers_to_channel(ch)
    Thread.current[key] = ch
    begin
      yield
    ensure
      ch = Thread.current[key]
      ch.close if ch.open?
      Thread.current[key] = nil
    end
  end
end
assign_error_handlers_to_channel(ch) click to toggle source
# File lib/vx/consumer/session.rb, line 147
def assign_error_handlers_to_channel(ch)
  ch.on_uncaught_exception {|e, c| ::Vx::Consumer.exception_handler(e, consumer: c) }
  ch.on_error {|e, c| ::Vx::Consumer.exception_handler(e, consumer: c) }
end
close() click to toggle source
# File lib/vx/consumer/session.rb, line 40
def close
  if open?
    @@session_lock.synchronize do
      instrument("closing_connection", info: conn_info)

      instrument("close_connection", info: conn_info) do
        begin
          conn.close
          while conn.status != :closed
            sleep 0.01
          end
        rescue Bunny::ChannelError, Bunny::ClientTimeout => e
          Consumer.exception_handler(e, {})
        end
      end
      @conn = nil
    end
  end
end
conn_info() click to toggle source
# File lib/vx/consumer/session.rb, line 93
def conn_info
  if conn
    "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}"
  else
    "not connected"
  end
end
declare_exchange(ch, name, options = nil) click to toggle source
# File lib/vx/consumer/session.rb, line 133
def declare_exchange(ch, name, options = nil)
  assert_connection_is_open

  options  ||= {}
  ch.exchange name, options
end
declare_queue(ch, name, options = nil) click to toggle source
# File lib/vx/consumer/session.rb, line 140
def declare_queue(ch, name, options = nil)
  assert_connection_is_open

  options ||= {}
  ch.queue name, options
end
live?() click to toggle source
# File lib/vx/consumer/session.rb, line 25
def live?
  @@live
end
open(options = {}) click to toggle source
# File lib/vx/consumer/session.rb, line 60
def open(options = {})
  return self if open?

  @@session_lock.synchronize do
    unless open?
      resume

      @conn ||= Bunny.new(
        nil,       # from ENV['RABBITMQ_URL']
        heartbeat: Consumer.configuration.heartbeat,
        automatically_recover: false
      )

      instrumentation = { info: conn_info }.merge(options)

      instrument("start_connecting", instrumentation)

      instrument("connect", instrumentation) do
        conn.start
        while conn.connecting?
          sleep 0.01
        end
      end
    end
  end

  self
end
open?() click to toggle source
# File lib/vx/consumer/session.rb, line 89
def open?
  conn && conn.open? && conn.status == :open
end
resume() click to toggle source
# File lib/vx/consumer/session.rb, line 29
def resume
  @@live = true
end
shutdown() click to toggle source
# File lib/vx/consumer/session.rb, line 18
def shutdown
  @@shutdown_lock.synchronize do
    @@live = false
    @@shutdown.broadcast
  end
end
wait_shutdown(timeout = nil) click to toggle source
# File lib/vx/consumer/session.rb, line 33
def wait_shutdown(timeout = nil)
  @@shutdown_lock.synchronize do
    @@shutdown.wait(@@shutdown_lock, timeout)
    not live?
  end
end
with_pub_channel() { |ch| ... } click to toggle source
# File lib/vx/consumer/session.rb, line 101
def with_pub_channel
  key = :vx_consumer_session_pub_channel
  if ch = Thread.current[key]
    yield ch
  else
    conn.with_channel do |c|
      yield c
    end
  end
end

Private Instance Methods

assert_connection_is_open() click to toggle source
# File lib/vx/consumer/session.rb, line 154
def assert_connection_is_open
  open? || raise(ConnectionDoesNotExistError)
end
config() click to toggle source
# File lib/vx/consumer/session.rb, line 158
def config
  Consumer.configuration
end