class Cuniculus::PubWorker

Each PubWorker maintains a background thread in a loop, fetching jobs reaching its job queue and publishing the payloads to RabbitMQ. They are not instantiated directly, but are rather created and managed by a {Cuniculus::Dispatcher}.

Public Class Methods

new(config, job_queue, dispatcher_chan) click to toggle source
   # File lib/cuniculus/pub_worker.rb
10 def initialize(config, job_queue, dispatcher_chan)
11   @config = config
12   @job_queue = job_queue
13   @dispatcher_chan = dispatcher_chan
14   @mutex = Mutex.new
15   @thread = nil
16 end

Public Instance Methods

alive?() click to toggle source

Whether the background thread is running.

@return [Boolean]

   # File lib/cuniculus/pub_worker.rb
40 def alive?
41   @thread&.alive? || false
42 end
start!(conn) click to toggle source

Declares exchanges, and starts a background thread that consumes and publishes messages.

If the connection to RabbitMQ it receives is not established, or if it fails to declare the exchanges, the background thread is not started and a message is sent to the dispatcher channel with the current timestamp. The dispatcher is then responsible for trying to set the connection up again and starting each of its workers.

@param conn [::Bunny::Session] Connection to RabbitMQ. Expected to be open at this stage.

   # File lib/cuniculus/pub_worker.rb
26 def start!(conn)
27   return @dispatcher_chan << Cuniculus.mark_time unless conn.open?
28 
29   @channel = sync { conn.create_channel }
30   @x = sync { @channel.direct(Cuniculus::CUNICULUS_EXCHANGE, { durable: true }) }
31   @dlx = sync { @channel.fanout(Cuniculus::CUNICULUS_DLX_EXCHANGE, { durable: true }) }
32   @thread = Thread.new { run }
33 rescue Bunny::Exception
34   @dispatcher_chan << Cuniculus.mark_time
35 end

Private Instance Methods

run() click to toggle source

Starts the job consuming loop. This is used internally by `start!` and runs in a background thread. Messages are published to RabbitMQ.

The loop is finished if the message `:shutdown` is retrieved from the job queue or if an exception happens while trying to publish a message to RabbitMQ. In the latter case, the job is reinserted into the job queue, and a message with the timestamp is sent into the dispatcher channel, so that it can try restart the connection and the workers again.

   # File lib/cuniculus/pub_worker.rb
54 def run
55   loop do
56     case msg = @job_queue.pop
57     when :shutdown
58       break
59     else
60       xname, payload, routing_key = msg
61       exchange = if xname == CUNICULUS_DLX_EXCHANGE
62                    sync { @dlx }
63                  else
64                    sync { @x }
65                  end
66       begin
67         publish_time = Cuniculus.mark_time
68         exchange.publish(payload, { routing_key: routing_key, persistent: true })
69       rescue *::Cuniculus::Dispatcher::RECOVERABLE_ERRORS
70         @job_queue << [xname, payload, routing_key]
71         @dispatcher_chan << publish_time
72         break
73       end
74     end
75   end
76   sync { @channel.close unless @channel.closed? }
77 end
sync(&block) click to toggle source
   # File lib/cuniculus/pub_worker.rb
79 def sync(&block)
80   @mutex.synchronize(&block)
81 end