class Basquiat::Adapters::RabbitMq::Session

A RabbitMQ session.

Attributes

channel[R]

Public Class Methods

new(channel, session_options = {}) click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 10
def initialize(channel, session_options = {})
  @channel = channel
  @options = session_options
end

Public Instance Methods

bind_queue(routing_key) click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 15
def bind_queue(routing_key)
  queue.bind(exchange, routing_key: routing_key)
end
exchange() click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 44
def exchange
  @exchange ||= channel.topic(@options[:exchange][:name],
                              durable:   @options[:exchange][:durable],
                              arguments: (@options[:exchange][:options] || {}))
end
publish(routing_key, message, props = {}) click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 19
def publish(routing_key, message, props = {})
  channel.confirm_select if @options[:publisher][:confirm]
  exchange.publish(Basquiat::Json.encode(message),
                   { routing_key: routing_key,
                     persistent:  true,
                     timestamp:   Time.now.to_i }.merge(props))
end
queue() click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 34
def queue
  @queue ||= channel.queue(@options.dig(:queue, :name),
                           durable:   @options.dig(:queue, :durable),
                           arguments: (@options[:queue][:options] || {}))
end
queue_name() click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 40
def queue_name
  queue.name
end
subscribe(block: true, manual_ack: @options[:consumer][:manual_ack]) { |message| ... } click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 27
def subscribe(block: true, manual_ack: @options[:consumer][:manual_ack])
  channel.prefetch(@options[:consumer][:prefetch])
  queue.subscribe(block: block, manual_ack: manual_ack) do |di, props, msg|
    yield Basquiat::Adapters::RabbitMq::Message.new(msg, di, props)
  end
end