class Bifrost::Worker
This class is used to read messages from the subscriber, and process the messages one by one. This class is a worker/actor which focusses on processes a single topic/subscriber combination one at a time
Attributes
Public Class Methods
A worker can tell you what it's friendly name will be, this is in order for supervision
# File lib/bifrost/worker.rb, line 53 def self.handle(topic, subscriber) "#{topic.downcase}--#{subscriber.downcase}" end
Initialise the worker/actor. This actually starts the worker by implicitly calling the run method
Bifrost::Entity::new
# File lib/bifrost/worker.rb, line 18 def initialize(topic, subscriber, callback, options = {}) raise Bifrost::Exceptions::UnsupportedLambdaError, 'callback is not a proc' unless callback.respond_to?(:call) @topic ||= topic @subscriber ||= subscriber @callback ||= callback @options ||= options super() info("Worker #{self} starting up...") publish('worker_ready', topic, subscriber) end
Public Instance Methods
This method starts the actor, which runs in an infinite loop. This means the worker should not terminate, but if it does, the supervisor will make sure it restarts
# File lib/bifrost/worker.rb, line 31 def run info("Worker #{self} running...") loop do info("Worker #{self} waking up...") if Bifrost.debug? read_message info("Worker #{self} going to sleep...") if Bifrost.debug? sleep(ENV['BIFROST_WORKER_SLEEP'] || 10) end end
Workers have a friendly name which is a combination of the topic and subscriber name which in the operational environment should be unique
# File lib/bifrost/worker.rb, line 43 def to_s Worker.handle(topic, subscriber) end
Utlity method to get the name of the worker as a symbol
# File lib/bifrost/worker.rb, line 48 def to_sym to_s.to_sym end
Private Instance Methods
Actual processing of the message
# File lib/bifrost/worker.rb, line 60 def read_message raw_message = @bus.interface.receive_subscription_message(topic, subscriber, timeout: ENV['BIFROST_TIMEOUT'] || 10) if raw_message info("Worker #{self} picked up message #{raw_message}") if Bifrost.debug? message = Bifrost::Message.new(raw_message) if options[:non_repeatable] @bus.interface.delete_subscription_message(raw_message) callback.call(message) else callback.call(message) @bus.interface.delete_subscription_message(raw_message) end elsif Bifrost.debug? info("Worker #{self} no message...") end end