class Bifrost::Worker

This class is used to read messages from the subscriber, and process the messages one by one. This class is a worker/actor which focusses on processes a single topic/subscriber combination one at a time

Attributes

callback[R]
options[R]
subscriber[R]
topic[R]

Public Class Methods

handle(topic, subscriber) click to toggle source

A worker can tell you what it's friendly name will be, this is in order for supervision

# File lib/bifrost/worker.rb, line 53
def self.handle(topic, subscriber)
  "#{topic.downcase}--#{subscriber.downcase}"
end
new(topic, subscriber, callback, options = {}) click to toggle source

Initialise the worker/actor. This actually starts the worker by implicitly calling the run method

Calls superclass method Bifrost::Entity::new
# File lib/bifrost/worker.rb, line 18
def initialize(topic, subscriber, callback, options = {})
  raise Bifrost::Exceptions::UnsupportedLambdaError, 'callback is not a proc' unless callback.respond_to?(:call)
  @topic ||= topic
  @subscriber ||= subscriber
  @callback ||= callback
  @options ||= options
  super()
  info("Worker #{self} starting up...")
  publish('worker_ready', topic, subscriber)
end

Public Instance Methods

run() click to toggle source

This method starts the actor, which runs in an infinite loop. This means the worker should not terminate, but if it does, the supervisor will make sure it restarts

# File lib/bifrost/worker.rb, line 31
def run
  info("Worker #{self} running...")
  loop do
    info("Worker #{self} waking up...") if Bifrost.debug?
    read_message
    info("Worker #{self} going to sleep...") if Bifrost.debug?
    sleep(ENV['BIFROST_WORKER_SLEEP'] || 10)
  end
end
to_s() click to toggle source

Workers have a friendly name which is a combination of the topic and subscriber name which in the operational environment should be unique

# File lib/bifrost/worker.rb, line 43
def to_s
  Worker.handle(topic, subscriber)
end
to_sym() click to toggle source

Utlity method to get the name of the worker as a symbol

# File lib/bifrost/worker.rb, line 48
def to_sym
  to_s.to_sym
end

Private Instance Methods

read_message() click to toggle source

Actual processing of the message

# File lib/bifrost/worker.rb, line 60
def read_message
  raw_message = @bus.interface.receive_subscription_message(topic, subscriber, timeout: ENV['BIFROST_TIMEOUT'] || 10)
  if raw_message
    info("Worker #{self} picked up message #{raw_message}") if Bifrost.debug?
    message = Bifrost::Message.new(raw_message)
    if options[:non_repeatable]
      @bus.interface.delete_subscription_message(raw_message)
      callback.call(message)
    else
      callback.call(message)
      @bus.interface.delete_subscription_message(raw_message)
    end
  elsif Bifrost.debug?
    info("Worker #{self} no message...")
  end
end