class Skein::Connected

Attributes

connection[R]
context[R]

Properties ===========================================================

ident[R]

Public Class Methods

new(config: nil, connection: nil, context: nil, ident: nil) click to toggle source

Instance Methods =====================================================

# File lib/skein/connected.rb, line 10
def initialize(config: nil, connection: nil, context: nil, ident: nil)
  @mutex = Mutex.new

  @config = config
  @connection_shared = !connection
  @connection = connection

  self.connect
  @channels = [ ]

  @context = context || Skein::Context.new
  @ident = ident || @context.ident(self)
end

Public Instance Methods

channel() click to toggle source
# File lib/skein/connected.rb, line 84
def channel
  @channel ||= self.create_channel
end
close() click to toggle source
# File lib/skein/connected.rb, line 88
def close
  lock do
    @channels.each do |channel|
      begin
        channel.close

      rescue => e
        if (defined?(MarchHare))
          case (e)
          when MarchHare::ChannelLevelException, MarchHare::ChannelAlreadyClosed
            # Ignored since we're finished with the channel anyway
          else
            raise e
          end
        elsif (defined?(Bunny))
          case (e)
          when Bunny::ChannelAlreadyClosed
            # Ignored since we're finished with the channel anyway
          else
            raise e
          end
        else
          raise e
        end
      end
    end

    @channels = [ ]

    unless (@connection_shared)
      @connection&.close
      @connection = nil
    end
  end
end
connect() click to toggle source
# File lib/skein/connected.rb, line 48
def connect
  @connection ||= repeat_until_not_nil do
    @connection_shared = false
    Skein::RabbitMQ.connect(@config)
  end
end
connection_shared?() click to toggle source
# File lib/skein/connected.rb, line 24
def connection_shared?
  @connection_shared
end
create_channel(auto_retry: false) click to toggle source
# File lib/skein/connected.rb, line 61
def create_channel(auto_retry: false)
  channel = begin
    @connection.create_channel

  rescue RuntimeError
    sleep(1)

    self.reconnect

    retry
  end

  if (channel.respond_to?(:prefetch=))
    channel.prefetch = 1
  else
    channel.prefetch(1)
  end

  @channels << channel

  channel
end
lock() { || ... } click to toggle source
# File lib/skein/connected.rb, line 28
def lock
  @mutex.synchronize do
    yield
  end
end
reconnect() click to toggle source
# File lib/skein/connected.rb, line 55
def reconnect
  @connection = nil

  self.connect
end
repeat_until_not_nil(delay: 1.0) { || ... } click to toggle source
# File lib/skein/connected.rb, line 34
def repeat_until_not_nil(delay: 1.0)
  r = nil

  loop do
    r = yield

    break if r

    sleep(delay)
  end

  r
end