class Cuniculus::PubWorker
Each PubWorker
maintains a background thread in a loop, fetching jobs reaching its job queue and publishing the payloads to RabbitMQ. They are not instantiated directly, but are rather created and managed by a {Cuniculus::Dispatcher}.
Public Class Methods
# File lib/cuniculus/pub_worker.rb 10 def initialize(config, job_queue, dispatcher_chan) 11 @config = config 12 @job_queue = job_queue 13 @dispatcher_chan = dispatcher_chan 14 @mutex = Mutex.new 15 @thread = nil 16 end
Public Instance Methods
Whether the background thread is running.
@return [Boolean]
# File lib/cuniculus/pub_worker.rb 40 def alive? 41 @thread&.alive? || false 42 end
Declares exchanges, and starts a background thread that consumes and publishes messages.
If the connection to RabbitMQ it receives is not established, or if it fails to declare the exchanges, the background thread is not started and a message is sent to the dispatcher channel with the current timestamp. The dispatcher is then responsible for trying to set the connection up again and starting each of its workers.
@param conn [::Bunny::Session] Connection to RabbitMQ. Expected to be open at this stage.
# File lib/cuniculus/pub_worker.rb 26 def start!(conn) 27 return @dispatcher_chan << Cuniculus.mark_time unless conn.open? 28 29 @channel = sync { conn.create_channel } 30 @x = sync { @channel.direct(Cuniculus::CUNICULUS_EXCHANGE, { durable: true }) } 31 @dlx = sync { @channel.fanout(Cuniculus::CUNICULUS_DLX_EXCHANGE, { durable: true }) } 32 @thread = Thread.new { run } 33 rescue Bunny::Exception 34 @dispatcher_chan << Cuniculus.mark_time 35 end
Private Instance Methods
Starts the job consuming loop. This is used internally by `start!` and runs in a background thread. Messages are published to RabbitMQ.
The loop is finished if the message `:shutdown` is retrieved from the job queue or if an exception happens while trying to publish a message to RabbitMQ. In the latter case, the job is reinserted into the job queue, and a message with the timestamp is sent into the dispatcher channel, so that it can try restart the connection and the workers again.
# File lib/cuniculus/pub_worker.rb 54 def run 55 loop do 56 case msg = @job_queue.pop 57 when :shutdown 58 break 59 else 60 xname, payload, routing_key = msg 61 exchange = if xname == CUNICULUS_DLX_EXCHANGE 62 sync { @dlx } 63 else 64 sync { @x } 65 end 66 begin 67 publish_time = Cuniculus.mark_time 68 exchange.publish(payload, { routing_key: routing_key, persistent: true }) 69 rescue *::Cuniculus::Dispatcher::RECOVERABLE_ERRORS 70 @job_queue << [xname, payload, routing_key] 71 @dispatcher_chan << publish_time 72 break 73 end 74 end 75 end 76 sync { @channel.close unless @channel.closed? } 77 end
# File lib/cuniculus/pub_worker.rb 79 def sync(&block) 80 @mutex.synchronize(&block) 81 end