class GirlFriday::Queue
Constants
- Actor
- Ready
- Shutdown
- Work
Attributes
name[R]
Public Class Methods
immediate!()
click to toggle source
# File lib/girl_friday/work_queue.rb, line 28 def self.immediate! alias_method :push, :push_immediately alias_method :<<, :push_immediately end
new(name, options={}, &block)
click to toggle source
# File lib/girl_friday/work_queue.rb, line 9 def initialize(name, options={}, &block) raise ArgumentError, "#{self.class.name} requires a block" unless block_given? @name = name.to_s @size = options[:size] || 5 @processor = block @error_handlers = (Array(options[:error_handler] || ErrorHandler.default)).map(&:new) @shutdown = false @shutting_down = false @busy_workers = [] @ready_workers = nil @created_at = Time.now.to_i @total_processed = @total_errors = @total_queued = 0 @persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || {})) @weakref = WeakRef.new(self) start GirlFriday.add_queue @weakref end
queue!()
click to toggle source
# File lib/girl_friday/work_queue.rb, line 33 def self.queue! alias_method :push, :push_async alias_method :<<, :push_async end
Public Instance Methods
push_async(work, &block)
click to toggle source
# File lib/girl_friday/work_queue.rb, line 44 def push_async(work, &block) @supervisor << Work[work, block] end
push_immediately(work) { |result| ... }
click to toggle source
# File lib/girl_friday/work_queue.rb, line 38 def push_immediately(work, &block) result = @processor.call(work) return yield result if block result end
shutdown(&block)
click to toggle source
# File lib/girl_friday/work_queue.rb, line 74 def shutdown(&block) # Runtime state should never be modified by caller thread, # only the Supervisor thread. @supervisor << Shutdown[block] end
status()
click to toggle source
# File lib/girl_friday/work_queue.rb, line 50 def status { @name => { :pid => $$, :pool_size => @size, :ready => @ready_workers ? @ready_workers.size : 0, :busy => @busy_workers.size, :backlog => @persister.size, :total_queued => @total_queued, :total_processed => @total_processed, :total_errors => @total_errors, :uptime => Time.now.to_i - @created_at, :created_at => @created_at, } } end
wait_for_empty()
click to toggle source
Busy wait for the queue to empty. Useful for testing.
# File lib/girl_friday/work_queue.rb, line 68 def wait_for_empty until @persister.size == 0 sleep 0.1 end end
working?()
click to toggle source
# File lib/girl_friday/work_queue.rb, line 80 def working? @busy_workers.size > 0 end
Private Instance Methods
drain()
click to toggle source
# File lib/girl_friday/work_queue.rb, line 165 def drain # give as much work to as many ready workers as possible todo = [@persister.size, ready_workers.size].min todo.times do worker = ready_workers.pop @busy_workers << worker worker << @persister.pop end end
handle_error(ex)
click to toggle source
# File lib/girl_friday/work_queue.rb, line 90 def handle_error(ex) # Redis network error? Log and ignore. @error_handlers.each { |handler| handler.handle(ex) } end
on_ready(who)
click to toggle source
# File lib/girl_friday/work_queue.rb, line 95 def on_ready(who) @total_processed += 1 if !shutting_down? && running? && work = @persister.pop who.this << work drain else @busy_workers.delete(who.this) ready_workers << who.this end rescue => ex handle_error(ex) end
on_work(work)
click to toggle source
# File lib/girl_friday/work_queue.rb, line 120 def on_work(work) @total_queued += 1 if !shutting_down? && running? && worker = ready_workers.pop @busy_workers << worker worker << work drain else @persister << work end rescue => ex handle_error(ex) end
ready_workers()
click to toggle source
# File lib/girl_friday/work_queue.rb, line 133 def ready_workers # start N workers @ready_workers ||= Array.new(@size) { Actor.spawn_link(&@work_loop) } end
running?()
click to toggle source
# File lib/girl_friday/work_queue.rb, line 86 def running? !@shutdown end
shutdown_complete()
click to toggle source
# File lib/girl_friday/work_queue.rb, line 108 def shutdown_complete begin @when_shutdown.call(self) if @when_shutdown rescue Exception => ex handle_error(ex) end end
shutting_down?()
click to toggle source
# File lib/girl_friday/work_queue.rb, line 116 def shutting_down? !!@shutting_down end
start()
click to toggle source
# File lib/girl_friday/work_queue.rb, line 138 def start @supervisor = Actor.spawn do Thread.current[:label] = "#{name}-supervisor" supervisor = Actor.current @work_loop = Proc.new do Thread.current[:label] = "#{name}-worker" while running? do work = Actor.receive if running? result = @processor.call(work.msg) work.callback.call(result) if work.callback supervisor << Ready[Actor.current] end end end Actor.trap_exit = true begin supervisor_loop rescue Exception => ex $stderr.print "Fatal error in girl_friday: supervisor for #{name} died.\n" $stderr.print("#{ex}\n") $stderr.print("#{ex.backtrace.join("\n")}\n") end end end
supervisor_loop()
click to toggle source
# File lib/girl_friday/work_queue.rb, line 175 def supervisor_loop loop do Actor.receive do |f| f.when(Ready) do |who| on_ready(who) end f.when(Work) do |work| on_work(work) end f.when(Shutdown) do |stop| @shutting_down = true if !working? @shutdown = true @when_shutdown = stop.callback @busy_workers.each { |w| w << stop } ready_workers.each { |w| w << stop } shutdown_complete GirlFriday.remove_queue @weakref return else Thread.pass shutdown(&stop.callback) end end f.when(Actor::DeadActorError) do |ex| if running? # TODO Provide current message contents as error context @total_errors += 1 @busy_workers.delete(ex.actor) ready_workers << Actor.spawn_link(&@work_loop) handle_error(ex.reason) drain end end end end end