class Bunny::ReaderLoop

Network activity loop that reads and passes incoming AMQP 0.9.1 methods for processing. They are dispatched further down the line in Bunny::Session and Bunny::Channel. This loop uses a separate thread internally.

This mimics the way RabbitMQ Java is designed quite closely. @private

Public Class Methods

new(transport, session, session_error_handler) click to toggle source
# File lib/bunny/reader_loop.rb, line 12
def initialize(transport, session, session_error_handler)
  @transport             = transport
  @session               = session
  @session_error_handler = session_error_handler
  @logger                = @session.logger

  @mutex                 = Mutex.new

  @stopping              = false
  @stopped               = false
  @network_is_down       = false
end

Public Instance Methods

join() click to toggle source
# File lib/bunny/reader_loop.rb, line 118
def join
  # Thread#join can/would trigger a re-raise of an unhandled exception in this thread.
  # In addition, Thread.handle_interrupt can be used by other libraries or application code
  # that would make this join operation fail with an obscure exception.
  # So we try to save everyone some really unpleasant debugging time by introducing
  # this condition which typically would not evaluate to true anyway.
  #
  # See ruby-amqp/bunny#589 and ruby-amqp/bunny#590 for background.
  @thread.join if @thread && @thread != Thread.current
end
kill() click to toggle source
# File lib/bunny/reader_loop.rb, line 129
def kill
  if @thread
    @thread.kill
    @thread.join
  end
end
raise(e) click to toggle source
# File lib/bunny/reader_loop.rb, line 114
def raise(e)
  @thread.raise(e) if @thread
end
resume() click to toggle source
# File lib/bunny/reader_loop.rb, line 30
def resume
  start
end
run_loop() click to toggle source
# File lib/bunny/reader_loop.rb, line 35
def run_loop
  loop do
    begin
      break if @mutex.synchronize { @stopping || @stopped || @network_is_down }
      run_once
    rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Timeout::Error,
           OpenSSL::OpenSSLError => e
      break if terminate? || @session.closing? || @session.closed?

      @network_is_down = true
      if @session.automatically_recover?
        log_exception(e, level: :warn)
        @session.handle_network_failure(e)
      else
        log_exception(e)
        @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
      end
    rescue ShutdownSignal => _
      @mutex.synchronize { @stopping = true }
      break
    rescue Exception => e
      break if terminate?
      if !(@session.closing? || @session.closed?)
        log_exception(e)

        @network_is_down = true
        @session_error_handler.raise(Bunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.message}", e))
      end
    rescue Errno::EBADF => _ebadf
      break if terminate?
      # ignored, happens when we loop after the transport has already been closed
      @mutex.synchronize { @stopping = true }
    end
  end

  @mutex.synchronize { @stopped = true }
end
run_once() click to toggle source
# File lib/bunny/reader_loop.rb, line 73
def run_once
  frame = @transport.read_next_frame
  return if frame.is_a?(AMQ::Protocol::HeartbeatFrame)

  if !frame.final? || frame.method_class.has_content?
    header   = @transport.read_next_frame
    content  = ''

    if header.body_size > 0
      loop do
        body_frame = @transport.read_next_frame
        content << body_frame.decode_payload

        break if content.bytesize >= header.body_size
      end
    end

    @session.handle_frameset(frame.channel, [frame.decode_payload, header.decode_payload, content])
  else
    @session.handle_frame(frame.channel, frame.decode_payload)
  end
end
start() click to toggle source
# File lib/bunny/reader_loop.rb, line 26
def start
  @thread    = Thread.new(&method(:run_loop))
end
stop() click to toggle source
# File lib/bunny/reader_loop.rb, line 96
def stop
  @mutex.synchronize { @stopping = true }
end
stopped?() click to toggle source
# File lib/bunny/reader_loop.rb, line 100
def stopped?
  @mutex.synchronize { @stopped }
end
stopping?() click to toggle source
# File lib/bunny/reader_loop.rb, line 104
def stopping?
  @mutex.synchronize { @stopping }
end
terminate_with(e) click to toggle source
# File lib/bunny/reader_loop.rb, line 108
def terminate_with(e)
  @mutex.synchronize { @stopping = true }

  self.raise(e)
end

Protected Instance Methods

io_error?(e) click to toggle source
# File lib/bunny/reader_loop.rb, line 148
def io_error?(e)
  [AMQ::Protocol::EmptyResponseError, IOError, SystemCallError].any? do |klazz|
    e.is_a?(klazz)
  end
end
log_exception(e, level: :error) click to toggle source
# File lib/bunny/reader_loop.rb, line 138
def log_exception(e, level: :error)
  if !(io_error?(e) && (@session.closing? || @session.closed?))
    @logger.send level, "Exception in the reader loop: #{e.class.name}: #{e.message}"
    @logger.send level, "Backtrace: "
    e.backtrace.each do |line|
      @logger.send level, "\t#{line}"
    end
  end
end
terminate?() click to toggle source
# File lib/bunny/reader_loop.rb, line 154
def terminate?
  @mutex.synchronize { @stopping || @stopped }
end