class Basquiat::Adapters::RabbitMq::Session
A RabbitMQ session.
Attributes
channel[R]
Public Class Methods
new(channel, session_options = {})
click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 10 def initialize(channel, session_options = {}) @channel = channel @options = session_options end
Public Instance Methods
bind_queue(routing_key)
click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 15 def bind_queue(routing_key) queue.bind(exchange, routing_key: routing_key) end
exchange()
click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 44 def exchange @exchange ||= channel.topic(@options[:exchange][:name], durable: @options[:exchange][:durable], arguments: (@options[:exchange][:options] || {})) end
publish(routing_key, message, props = {})
click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 19 def publish(routing_key, message, props = {}) channel.confirm_select if @options[:publisher][:confirm] exchange.publish(Basquiat::Json.encode(message), { routing_key: routing_key, persistent: true, timestamp: Time.now.to_i }.merge(props)) end
queue()
click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 34 def queue @queue ||= channel.queue(@options.dig(:queue, :name), durable: @options.dig(:queue, :durable), arguments: (@options[:queue][:options] || {})) end
queue_name()
click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 40 def queue_name queue.name end
subscribe(block: true, manual_ack: @options[:consumer][:manual_ack]) { |message| ... }
click to toggle source
# File lib/basquiat/adapters/rabbitmq/session.rb, line 27 def subscribe(block: true, manual_ack: @options[:consumer][:manual_ack]) channel.prefetch(@options[:consumer][:prefetch]) queue.subscribe(block: block, manual_ack: manual_ack) do |di, props, msg| yield Basquiat::Adapters::RabbitMq::Message.new(msg, di, props) end end