class Bunny::ReaderLoop

Network activity loop that reads and passes incoming AMQP 0.9.1 methods for processing. They are dispatched further down the line in Bunny::Session and Bunny::Channel. This loop uses a separate thread internally.

This mimics the way RabbitMQ Java is designed quite closely. @private

Public Class Methods

new(transport, session, session_thread) click to toggle source
# File lib/bunny/reader_loop.rb, line 12
def initialize(transport, session, session_thread)
  @transport      = transport
  @session        = session
  @session_thread = session_thread
  @logger         = @session.logger

  @mutex          = Mutex.new
end

Public Instance Methods

join() click to toggle source
# File lib/bunny/reader_loop.rb, line 113
def join
  @thread.join if @thread
end
kill() click to toggle source
# File lib/bunny/reader_loop.rb, line 117
def kill
  if @thread
    @thread.kill
    @thread.join
  end
end
raise(e) click to toggle source
# File lib/bunny/reader_loop.rb, line 109
def raise(e)
  @thread.raise(e) if @thread
end
resume() click to toggle source
# File lib/bunny/reader_loop.rb, line 26
def resume
  start
end
run_loop() click to toggle source
# File lib/bunny/reader_loop.rb, line 31
def run_loop
  loop do
    begin
      break if @mutex.synchronize { @stopping || @stopped || @network_is_down }
      run_once
    rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Timeout::Error => e
      break if terminate? || @session.closing? || @session.closed?

      @network_is_down = true
      if @session.automatically_recover?
        log_exception(e, level: :warn)
        @session.handle_network_failure(e)
      else
        log_exception(e)
        @session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
      end
    rescue ShutdownSignal => _
      @mutex.synchronize { @stopping = true }
      break
    rescue Exception => e
      break if terminate?
      if !(@session.closing? || @session.closed?)
        log_exception(e)

        @network_is_down = true
        @session_thread.raise(Bunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.message}", e))
      end
    rescue Errno::EBADF => _ebadf
      break if terminate?
      # ignored, happens when we loop after the transport has already been closed
      @mutex.synchronize { @stopping = true }
    end
  end

  @mutex.synchronize { @stopped = true }
end
run_once() click to toggle source
# File lib/bunny/reader_loop.rb, line 68
def run_once
  frame = @transport.read_next_frame
  return if frame.is_a?(AMQ::Protocol::HeartbeatFrame)

  if !frame.final? || frame.method_class.has_content?
    header   = @transport.read_next_frame
    content  = ''

    if header.body_size > 0
      loop do
        body_frame = @transport.read_next_frame
        content << body_frame.decode_payload

        break if content.bytesize >= header.body_size
      end
    end

    @session.handle_frameset(frame.channel, [frame.decode_payload, header.decode_payload, content])
  else
    @session.handle_frame(frame.channel, frame.decode_payload)
  end
end
start() click to toggle source
# File lib/bunny/reader_loop.rb, line 22
def start
  @thread    = Thread.new(&method(:run_loop))
end
stop() click to toggle source
# File lib/bunny/reader_loop.rb, line 91
def stop
  @mutex.synchronize { @stopping = true }
end
stopped?() click to toggle source
# File lib/bunny/reader_loop.rb, line 95
def stopped?
  @mutex.synchronize { @stopped }
end
stopping?() click to toggle source
# File lib/bunny/reader_loop.rb, line 99
def stopping?
  @mutex.synchronize { @stopping }
end
terminate_with(e) click to toggle source
# File lib/bunny/reader_loop.rb, line 103
def terminate_with(e)
  @mutex.synchronize { @stopping = true }

  self.raise(e)
end

Protected Instance Methods

io_error?(e) click to toggle source
# File lib/bunny/reader_loop.rb, line 136
def io_error?(e)
  [AMQ::Protocol::EmptyResponseError, IOError, SystemCallError].any? do |klazz|
    e.is_a?(klazz)
  end
end
log_exception(e, level: :error) click to toggle source
# File lib/bunny/reader_loop.rb, line 126
def log_exception(e, level: :error)
  if !(io_error?(e) && (@session.closing? || @session.closed?))
    @logger.send level, "Exception in the reader loop: #{e.class.name}: #{e.message}"
    @logger.send level, "Backtrace: "
    e.backtrace.each do |line|
      @logger.send level, "\t#{line}"
    end
  end
end
terminate?() click to toggle source
# File lib/bunny/reader_loop.rb, line 142
def terminate?
  @mutex.synchronize { @stopping || @stopped }
end