class RocketJob::Event
Publish and Subscribe to events. Events are published immediately and usually consumed almost immediately by all subscriber processes.
Constants
- ALL_EVENTS
Public Class Methods
add_subscriber(subscriber)
click to toggle source
# File lib/rocket_job/event.rb, line 110 def self.add_subscriber(subscriber) name = subscriber.class.event_name @subscribers[name] = @subscribers[name] << subscriber subscriber.object_id end
collection_exists?()
click to toggle source
# File lib/rocket_job/event.rb, line 152 def self.collection_exists? collection.database.collection_names.include?(collection_name.to_s) end
convert_to_capped_collection(size)
click to toggle source
Convert a non-capped collection to capped
# File lib/rocket_job/event.rb, line 157 def self.convert_to_capped_collection(size) collection.database.command("convertToCapped" => collection_name.to_s, "size" => size) end
create_capped_collection(size: capped_collection_size)
click to toggle source
Create the capped collection only if it does not exist. Drop the collection before calling this method to re-create it.
# File lib/rocket_job/event.rb, line 99 def self.create_capped_collection(size: capped_collection_size) if collection_exists? convert_to_capped_collection(size) unless collection.capped? else collection.client[collection_name, {capped: true, size: size}].create end end
listener(time: @load_time)
click to toggle source
Indefinitely tail the capped collection looking for new events.
time: the start time from which to start looking for new events.
# File lib/rocket_job/event.rb, line 86 def self.listener(time: @load_time) Thread.current.name = "rocketjob event" create_capped_collection logger.info("Event listener started") tail_capped_collection(time) { |event| process_event(event) } rescue Exception => e logger.error("#listener Event listener is terminating due to unhandled exception", e) raise(e) end
process_event(event)
click to toggle source
Process a new event, calling registered subscribers.
# File lib/rocket_job/event.rb, line 138 def self.process_event(event) logger.info("Event Received", event.attributes) if @subscribers.key?(event.name) @subscribers[event.name].each { |subscriber| subscriber.process_action(event.action, event.parameters) } end if @subscribers.key?(ALL_EVENTS) @subscribers[ALL_EVENTS].each { |subscriber| subscriber.process_event(event.name, event.action, event.parameters) } end rescue StandardError => e logger.error("Unknown subscriber. Continuing..", e) end
subscribe(subscriber) { |subscriber| ... }
click to toggle source
Add a subscriber for its events. Returns a handle to the subscription that can be used to unsubscribe this particular subscription
Example: def MySubscriber
include RocketJob::Subscriber def hello logger.info "Hello Action Received" end def show(message:) logger.info "Received: #{message}" end
end
MySubscriber.subscribe
# File lib/rocket_job/event.rb, line 66 def self.subscribe(subscriber) if block_given? begin handle = add_subscriber(subscriber) yield(subscriber) ensure unsubscribe(handle) if handle end else add_subscriber(subscriber) end end
tail_capped_collection(time) { |event| ... }
click to toggle source
# File lib/rocket_job/event.rb, line 116 def self.tail_capped_collection(time) with(socket_timeout: long_poll_seconds + 10) do filter = {created_at: {"$gt" => time}} collection. find(filter). await_data. cursor_type(:tailable_await). max_await_time_ms(long_poll_seconds * 1000). sort("$natural" => 1). each do |doc| event = Mongoid::Factory.from_db(Event, doc) # Recovery will occur from after the last message read time = event.created_at yield(event) end end rescue Mongo::Error::SocketError, Mongo::Error::SocketTimeoutError, Mongo::Error::OperationFailure, Timeout::Error => e logger.info("Creating a new cursor and trying again: #{e.class.name} #{e.message}") retry end
unsubscribe(handle)
click to toggle source
Unsubscribes a previous subscription
# File lib/rocket_job/event.rb, line 80 def self.unsubscribe(handle) @subscribers.each_value { |v| v.delete_if { |i| i.object_id == handle } } end