class ThreadJob::Memory::Store
Public Class Methods
new(max_retries=10, logger=Logger.new(STDOUT))
click to toggle source
# File lib/thread_job/backends/memory/store.rb, line 15 def initialize(max_retries=10, logger=Logger.new(STDOUT)) @jobs = {} @failed_jobs = {} @mutex = Mutex.new @logger = logger @max_retries = max_retries end
Public Instance Methods
complete_job(queue_name, job_id)
click to toggle source
# File lib/thread_job/backends/memory/store.rb, line 75 def complete_job(queue_name, job_id) @mutex.synchronize { job = get_job(queue_name, job_id) if job @jobs[queue_name].delete(job) @logger.info("[MemoryStore] job: '#{job.job_name}' has been completed and removed from the queue") end } end
fail_job(queue_name, job_id)
click to toggle source
# File lib/thread_job/backends/memory/store.rb, line 85 def fail_job(queue_name, job_id) @mutex.synchronize { job = get_job(queue_name, job_id) if job job.status = FAILED job.attempts += 1 if job.attempts == @max_retries @failed_jobs[queue_name].push(job) @jobs[queue_name].delete(job) @logger.warn("[MemoryStore] job: '#{job.job_name}' has failed the reached the maximum amount of retries (#{@max_retries}) and is being removed from the queue.") else @logger.info("[MemoryStore] failed job: '#{job.job_name}' has been requeued and attempted #{job.attempts} times") end end } end
get_job(queue_name, job_id)
click to toggle source
# File lib/thread_job/backends/memory/store.rb, line 59 def get_job(queue_name, job_id) found_job = false if @jobs[queue_name] != nil @jobs[queue_name].each do |job| if job.id == job_id found_job = true return job end end end @logger.warn("[MemoryStore] unable to get job: #{job_id} from queue: #{queue_name}") return nil end
poll_for_job(queue_name)
click to toggle source
# File lib/thread_job/backends/memory/store.rb, line 42 def poll_for_job(queue_name) @jobs[queue_name] ||= [] @logger.debug("[MemoryStore] Polling for jobs, #{@jobs[queue_name].length} in the queue") @mutex.synchronize { @jobs[queue_name].each do |record| if record.status == AVAILABLE || (record.status == FAILED && record.attempts < @max_retries) record.status = WORKING @logger.debug("[MemoryStore] Sending job '#{record.job_name}' to the thread pool for work") return {id: record.id, job: record.job, job_name: record.job_name} end end } return nil end
save_job(queue_name, job_name, job)
click to toggle source
# File lib/thread_job/backends/memory/store.rb, line 23 def save_job(queue_name, job_name, job) @mutex.synchronize { queued_jobs = @jobs[queue_name] ||= [] failed_queue_jobs = @failed_jobs[queue_name] ||= [] rec = Memory::Record.new rec.attempts = 0 rec.id = queued_jobs.count + 1 rec.job_name = job_name rec.job = job rec.status = AVAILABLE rec.queue_name = queue_name queued_jobs.push(rec) } @logger.info("[MemoryStore] Saved #{job_name}") end