class Msgr::Connection

Attributes

config[R]
uri[R]

Public Class Methods

new(uri, config, dispatcher) click to toggle source
# File lib/msgr/connection.rb, line 11
def initialize(uri, config, dispatcher)
  @uri        = uri
  @config     = config
  @dispatcher = dispatcher
  @channels   = []
end

Public Instance Methods

bind(routes) click to toggle source
# File lib/msgr/connection.rb, line 87
def bind(routes)
  if routes.empty?
    log(:warn) do
      "No routes to bound to. Bind will have no effect:\n" \
      "  #{routes.inspect}"
    end
  else
    bind_all(routes)
  end
end
bindings() click to toggle source
# File lib/msgr/connection.rb, line 83
def bindings
  @bindings ||= []
end
channel(prefetch: 1) click to toggle source
# File lib/msgr/connection.rb, line 38
def channel(prefetch: 1)
  channel = Msgr::Channel.new(config, connection)
  channel.prefetch(prefetch)
  @channels << channel
  channel
end
close() click to toggle source
# File lib/msgr/connection.rb, line 98
def close
  @channels.each(&:close)
  connection.close if @connection
  log(:debug) { 'Closed.' }
end
connect() click to toggle source
# File lib/msgr/connection.rb, line 34
def connect
  connection
end
connection() click to toggle source
# File lib/msgr/connection.rb, line 30
def connection
  @connection ||= ::Bunny.new(config).tap(&:start)
end
delete() click to toggle source
# File lib/msgr/connection.rb, line 57
def delete
  return if bindings.empty?

  log(:debug) { "Delete bindings (#{bindings.size})..." }

  bindings.each(&:delete)
end
exchange() click to toggle source
# File lib/msgr/connection.rb, line 45
def exchange
  @exchange ||= channel.exchange
end
publish(message, opts = {}) click to toggle source
# File lib/msgr/connection.rb, line 22
def publish(message, opts = {})
  opts[:routing_key] = opts.delete(:to) if opts[:to]

  exchange.publish message.to_s, opts.merge(persistent: true)

  log(:debug) { "Published message to #{opts[:routing_key]}" }
end
purge(**kwargs) click to toggle source
# File lib/msgr/connection.rb, line 65
def purge(**kwargs)
  return if bindings.empty?

  log(:debug) { "Purge bindings (#{bindings.size})..." }

  bindings.each {|b| b.purge(**kwargs) }
end
purge_queue(name) click to toggle source
# File lib/msgr/connection.rb, line 73
def purge_queue(name)
  # Creating the queue in passive mode ensures that queues that do not exist
  # won't be created just to purge them.
  # That requires creating a new channel every time, as exceptions (on
  # missing queues) invalidate the channel.
  channel.queue(name, passive: true).purge
rescue Bunny::NotFound
  nil
end
release() click to toggle source
# File lib/msgr/connection.rb, line 49
def release
  return if bindings.empty?

  log(:debug) { "Release bindings (#{bindings.size})..." }

  bindings.each(&:release)
end
running?() click to toggle source
# File lib/msgr/connection.rb, line 18
def running?
  bindings.any?
end

Private Instance Methods

bind_all(routes) click to toggle source
# File lib/msgr/connection.rb, line 106
def bind_all(routes)
  routes.each {|route| bindings << Binding.new(self, route, @dispatcher) }
end