class Warren::Handler::Broadcast
Class Warren::Broadcast provides a connection pool of threadsafe RabbitMQ channels for broadcasting messages
Public Class Methods
Creates a warren but does not connect.
@param [Hash] server Server config options passes straight to Bunny @param [String] exchange The name of the exchange to connect to @param [Integer] pool_size The connection pool size @param [String,nil] routing_key_prefix The prefix to pass before the routing key.
Can be used to ensure environments remain distinct.
# File lib/warren/handler/broadcast.rb, line 76 def initialize(exchange:, routing_key_prefix:, server: {}, pool_size: 14) super() @server = server @exchange_name = exchange @pool_size = pool_size @routing_key_prefix = routing_key_prefix end
Public Instance Methods
Borrows a RabbitMQ channel, sends a message, and immediately returns it again. Useful if you only need to send one message.
@param [Warren::Message] message The message to broadcast. Must respond to routing_key and payload
@return [Warren::Handler::Broadcast] Returns itself to allow chaining. But you're
probably better off using #with_channel in that case
# File lib/warren/handler/broadcast.rb, line 125 def <<(message) with_channel { |channel| channel << message } self end
Opens a connection to the RabbitMQ server. Will need to be re-initialized after forking.
@return [true] We've connected!
# File lib/warren/handler/broadcast.rb, line 89 def connect reset_pool start_session end
Closes the connection. Call before forking to avoid leaking connections
@return [true] We've disconnected
# File lib/warren/handler/broadcast.rb, line 100 def disconnect close_session end
# File lib/warren/handler/broadcast.rb, line 130 def new_channel(worker_count: 1) Channel.new(session.create_channel(nil, worker_count), exchange: @exchange_name, routing_key_prefix: @routing_key_prefix) end
Yields an {Warren::Handler::Broadcast::Channel} which gets returned to the pool on block closure
@return [void]
@yieldparam [Warren::Handler::Broadcast::Channel] A rabbitMQ channel that sends messages to the configured
exchange
# File lib/warren/handler/broadcast.rb, line 111 def with_channel(&block) connection_pool.with(&block) end
Private Instance Methods
# File lib/warren/handler/broadcast.rb, line 156 def close_session reset_pool @session&.close @session = nil end
# File lib/warren/handler/broadcast.rb, line 145 def connection_pool @connection_pool ||= start_session && ConnectionPool.new(size: @pool_size, timeout: 5) do new_channel end end
# File lib/warren/handler/broadcast.rb, line 162 def reset_pool @connection_pool&.shutdown { |ch| ch.close } @connection_pool = nil end
# File lib/warren/handler/broadcast.rb, line 137 def server_connection ENV.fetch('WARREN_CONNECTION_URI', @server) end
# File lib/warren/handler/broadcast.rb, line 141 def session @session ||= Bunny.new(server_connection) end
# File lib/warren/handler/broadcast.rb, line 151 def start_session session.start true end