class Rocketman::JobQueue
Constants
- QUEUE_KEY
Public Class Methods
new()
click to toggle source
# File lib/rocketman/job_queue.rb, line 12 def initialize @storage = Rocketman.configuration.storage @jobs = get_job_queue at_exit { persist_events } if @storage.class.to_s == "Redis" end
Public Instance Methods
schedule(job)
click to toggle source
# File lib/rocketman/job_queue.rb, line 19 def schedule(job) @jobs << job end
Private Instance Methods
get_job_queue()
click to toggle source
# File lib/rocketman/job_queue.rb, line 25 def get_job_queue case @storage.class.to_s when "Redis" rehydrate_events else Queue.new end end
persist_events()
click to toggle source
# File lib/rocketman/job_queue.rb, line 57 def persist_events return if @jobs.empty? puts "Persisting Rocketman events to #{@storage.class}" if Rocketman.configuration.debug intermediary = [] event_count = 0 until @jobs.empty? intermediary << @jobs.pop event_count += 1 end @jobs.close marshalled_json = Marshal.dump(intermediary).to_json # For security measure to prevent remote code execution (only allow contents valid in JSON) @storage.set(QUEUE_KEY, marshalled_json) puts "Persisted #{event_count} events to #{@storage.class}" if Rocketman.configuration.debug end
rehydrate_events()
click to toggle source
# File lib/rocketman/job_queue.rb, line 34 def rehydrate_events queue = Queue.new if raw_data = @storage.get(QUEUE_KEY) puts "Rehydrating Rocketman events from #{@storage.class}" if Rocketman.configuration.debug rehydrate = JSON.restore(raw_data) # For security measure to prevent remote code execution (only allow contents valid in JSON) jobs = Marshal.load(rehydrate) event_count = 0 until jobs.empty? queue << jobs.shift event_count += 1 end puts "Rehydrated #{event_count} events from #{@storage.class}" if Rocketman.configuration.debug @storage.del(QUEUE_KEY) # After rehydration, delete it off Redis end queue end