class Warren::Handler::Broadcast

Class Warren::Broadcast provides a connection pool of threadsafe RabbitMQ channels for broadcasting messages

Public Class Methods

new(exchange:, routing_key_prefix:, server: {}, pool_size: 14) click to toggle source

Creates a warren but does not connect.

@param [Hash] server Server config options passes straight to Bunny @param [String] exchange The name of the exchange to connect to @param [Integer] pool_size The connection pool size @param [String,nil] routing_key_prefix The prefix to pass before the routing key.

Can be used to ensure environments remain distinct.
Calls superclass method
# File lib/warren/handler/broadcast.rb, line 76
def initialize(exchange:, routing_key_prefix:, server: {}, pool_size: 14)
  super()
  @server = server
  @exchange_name = exchange
  @pool_size = pool_size
  @routing_key_prefix = routing_key_prefix
end

Public Instance Methods

<<(message) click to toggle source

Borrows a RabbitMQ channel, sends a message, and immediately returns it again. Useful if you only need to send one message.

@param [Warren::Message] message The message to broadcast. Must respond to routing_key and payload

@return [Warren::Handler::Broadcast] Returns itself to allow chaining. But you're

probably better off using #with_channel
in that case
# File lib/warren/handler/broadcast.rb, line 125
def <<(message)
  with_channel { |channel| channel << message }
  self
end
connect() click to toggle source

Opens a connection to the RabbitMQ server. Will need to be re-initialized after forking.

@return [true] We've connected!

# File lib/warren/handler/broadcast.rb, line 89
def connect
  reset_pool
  start_session
end
disconnect() click to toggle source

Closes the connection. Call before forking to avoid leaking connections

@return [true] We've disconnected

# File lib/warren/handler/broadcast.rb, line 100
def disconnect
  close_session
end
new_channel(worker_count: 1) click to toggle source
# File lib/warren/handler/broadcast.rb, line 130
def new_channel(worker_count: 1)
  Channel.new(session.create_channel(nil, worker_count), exchange: @exchange_name,
                                                         routing_key_prefix: @routing_key_prefix)
end
with_channel(&block) click to toggle source

Yields an {Warren::Handler::Broadcast::Channel} which gets returned to the pool on block closure

@return [void]

@yieldparam [Warren::Handler::Broadcast::Channel] A rabbitMQ channel that sends messages to the configured

exchange
# File lib/warren/handler/broadcast.rb, line 111
def with_channel(&block)
  connection_pool.with(&block)
end

Private Instance Methods

close_session() click to toggle source
# File lib/warren/handler/broadcast.rb, line 156
def close_session
  reset_pool
  @session&.close
  @session = nil
end
connection_pool() click to toggle source
# File lib/warren/handler/broadcast.rb, line 145
def connection_pool
  @connection_pool ||= start_session && ConnectionPool.new(size: @pool_size, timeout: 5) do
    new_channel
  end
end
reset_pool() click to toggle source
# File lib/warren/handler/broadcast.rb, line 162
def reset_pool
  @connection_pool&.shutdown { |ch| ch.close }
  @connection_pool = nil
end
server_connection() click to toggle source
# File lib/warren/handler/broadcast.rb, line 137
def server_connection
  ENV.fetch('WARREN_CONNECTION_URI', @server)
end
session() click to toggle source
# File lib/warren/handler/broadcast.rb, line 141
def session
  @session ||= Bunny.new(server_connection)
end
start_session() click to toggle source
# File lib/warren/handler/broadcast.rb, line 151
def start_session
  session.start
  true
end