class RocketJob::Event

RocketJob::Event

Publish and Subscribe to events. Events are published immediately and usually consumed almost immediately by all subscriber processes.

Constants

ALL_EVENTS

Public Class Methods

add_subscriber(subscriber) click to toggle source
# File lib/rocket_job/event.rb, line 110
def self.add_subscriber(subscriber)
  name               = subscriber.class.event_name
  @subscribers[name] = @subscribers[name] << subscriber
  subscriber.object_id
end
collection_exists?() click to toggle source
# File lib/rocket_job/event.rb, line 152
def self.collection_exists?
  collection.database.collection_names.include?(collection_name.to_s)
end
convert_to_capped_collection(size) click to toggle source

Convert a non-capped collection to capped

# File lib/rocket_job/event.rb, line 157
def self.convert_to_capped_collection(size)
  collection.database.command("convertToCapped" => collection_name.to_s, "size" => size)
end
create_capped_collection(size: capped_collection_size) click to toggle source

Create the capped collection only if it does not exist. Drop the collection before calling this method to re-create it.

# File lib/rocket_job/event.rb, line 99
def self.create_capped_collection(size: capped_collection_size)
  if collection_exists?
    convert_to_capped_collection(size) unless collection.capped?
  else
    collection.client[collection_name, {capped: true, size: size}].create
  end
end
listener(time: @load_time) click to toggle source

Indefinitely tail the capped collection looking for new events.

time: the start time from which to start looking for new events.
# File lib/rocket_job/event.rb, line 86
def self.listener(time: @load_time)
  Thread.current.name = "rocketjob event"
  create_capped_collection

  logger.info("Event listener started")
  tail_capped_collection(time) { |event| process_event(event) }
rescue Exception => e
  logger.error("#listener Event listener is terminating due to unhandled exception", e)
  raise(e)
end
process_event(event) click to toggle source

Process a new event, calling registered subscribers.

# File lib/rocket_job/event.rb, line 138
def self.process_event(event)
  logger.info("Event Received", event.attributes)

  if @subscribers.key?(event.name)
    @subscribers[event.name].each { |subscriber| subscriber.process_action(event.action, event.parameters) }
  end

  if @subscribers.key?(ALL_EVENTS)
    @subscribers[ALL_EVENTS].each { |subscriber| subscriber.process_event(event.name, event.action, event.parameters) }
  end
rescue StandardError => e
  logger.error("Unknown subscriber. Continuing..", e)
end
subscribe(subscriber) { |subscriber| ... } click to toggle source

Add a subscriber for its events. Returns a handle to the subscription that can be used to unsubscribe this particular subscription

Example: def MySubscriber

include RocketJob::Subscriber

def hello
  logger.info "Hello Action Received"
end

def show(message:)
  logger.info "Received: #{message}"
end

end

MySubscriber.subscribe

# File lib/rocket_job/event.rb, line 66
def self.subscribe(subscriber)
  if block_given?
    begin
      handle = add_subscriber(subscriber)
      yield(subscriber)
    ensure
      unsubscribe(handle) if handle
    end
  else
    add_subscriber(subscriber)
  end
end
tail_capped_collection(time) { |event| ... } click to toggle source
# File lib/rocket_job/event.rb, line 116
def self.tail_capped_collection(time)
  with(socket_timeout: long_poll_seconds + 10) do
    filter = {created_at: {"$gt" => time}}
    collection.
      find(filter).
      await_data.
      cursor_type(:tailable_await).
      max_await_time_ms(long_poll_seconds * 1000).
      sort("$natural" => 1).
      each do |doc|
      event = Mongoid::Factory.from_db(Event, doc)
      # Recovery will occur from after the last message read
      time = event.created_at
      yield(event)
    end
  end
rescue Mongo::Error::SocketError, Mongo::Error::SocketTimeoutError, Mongo::Error::OperationFailure, Timeout::Error => e
  logger.info("Creating a new cursor and trying again: #{e.class.name} #{e.message}")
  retry
end
unsubscribe(handle) click to toggle source

Unsubscribes a previous subscription

# File lib/rocket_job/event.rb, line 80
def self.unsubscribe(handle)
  @subscribers.each_value { |v| v.delete_if { |i| i.object_id == handle } }
end