class GirlFriday::WorkQueue

Constants

Actor
Ready
Shutdown
Work

Attributes

name[R]

Public Class Methods

immediate!() click to toggle source
# File lib/girl_friday/work_queue.rb, line 28
def self.immediate!
  alias_method :push, :push_immediately
  alias_method :<<, :push_immediately
end
new(name, options={}, &block) click to toggle source
# File lib/girl_friday/work_queue.rb, line 9
def initialize(name, options={}, &block)
  raise ArgumentError, "#{self.class.name} requires a block" unless block_given?
  @name = name.to_s
  @size = options[:size] || 5
  @processor = block
  @error_handlers = (Array(options[:error_handler] || ErrorHandler.default)).map(&:new)

  @shutdown = false
  @shutting_down = false
  @busy_workers = []
  @ready_workers = nil
  @created_at = Time.now.to_i
  @total_processed = @total_errors = @total_queued = 0
  @persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || {}))
  @weakref = WeakRef.new(self)
  start
  GirlFriday.add_queue @weakref
end
queue!() click to toggle source
# File lib/girl_friday/work_queue.rb, line 33
def self.queue!
  alias_method :push, :push_async
  alias_method :<<, :push_async
end

Public Instance Methods

<<(work, &block)
Alias for: push_async
push(work, &block)
Alias for: push_async
push_async(work, &block) click to toggle source
# File lib/girl_friday/work_queue.rb, line 44
def push_async(work, &block)
  @supervisor << Work[work, block]
end
Also aliased as: push, <<
push_immediately(work) { |result| ... } click to toggle source
# File lib/girl_friday/work_queue.rb, line 38
def push_immediately(work, &block)
  result = @processor.call(work)
  return yield result if block
  result
end
shutdown(&block) click to toggle source
# File lib/girl_friday/work_queue.rb, line 74
def shutdown(&block)
  # Runtime state should never be modified by caller thread,
  # only the Supervisor thread.
  @supervisor << Shutdown[block]
end
status() click to toggle source
# File lib/girl_friday/work_queue.rb, line 50
def status
  { @name => {
      :pid => $$,
      :pool_size => @size,
      :ready => @ready_workers ? @ready_workers.size : 0,
      :busy => @busy_workers.size,
      :backlog => @persister.size,
      :total_queued => @total_queued,
      :total_processed => @total_processed,
      :total_errors => @total_errors,
      :uptime => Time.now.to_i - @created_at,
      :created_at => @created_at,
    }
  }
end
wait_for_empty() click to toggle source

Busy wait for the queue to empty. Useful for testing.

# File lib/girl_friday/work_queue.rb, line 68
def wait_for_empty
  until @persister.size == 0
    sleep 0.1
  end
end
working?() click to toggle source
# File lib/girl_friday/work_queue.rb, line 80
def working?
  @busy_workers.size > 0
end

Private Instance Methods

drain() click to toggle source
# File lib/girl_friday/work_queue.rb, line 165
def drain
  # give as much work to as many ready workers as possible
  todo = [@persister.size, ready_workers.size].min
  todo.times do
    worker = ready_workers.pop
    @busy_workers << worker
    worker << @persister.pop
  end
end
handle_error(ex) click to toggle source
# File lib/girl_friday/work_queue.rb, line 90
def handle_error(ex)
  # Redis network error?  Log and ignore.
  @error_handlers.each { |handler| handler.handle(ex) }
end
on_ready(who) click to toggle source
# File lib/girl_friday/work_queue.rb, line 95
def on_ready(who)
  @total_processed += 1
  if !shutting_down? && running? && work = @persister.pop
    who.this << work
    drain
  else
    @busy_workers.delete(who.this)
    ready_workers << who.this
  end
rescue => ex
  handle_error(ex)
end
on_work(work) click to toggle source
# File lib/girl_friday/work_queue.rb, line 120
def on_work(work)
  @total_queued += 1
  if !shutting_down? && running? && worker = ready_workers.pop
    @busy_workers << worker
    worker << work
    drain
  else
    @persister << work
  end
rescue => ex
  handle_error(ex)
end
ready_workers() click to toggle source
# File lib/girl_friday/work_queue.rb, line 133
def ready_workers
  # start N workers
  @ready_workers ||= Array.new(@size) { Actor.spawn_link(&@work_loop) }
end
running?() click to toggle source
# File lib/girl_friday/work_queue.rb, line 86
def running?
  !@shutdown
end
shutdown_complete() click to toggle source
# File lib/girl_friday/work_queue.rb, line 108
def shutdown_complete
  begin
    @when_shutdown.call(self) if @when_shutdown
  rescue Exception => ex
    handle_error(ex)
  end
end
shutting_down?() click to toggle source
# File lib/girl_friday/work_queue.rb, line 116
def shutting_down?
  !!@shutting_down
end
start() click to toggle source
# File lib/girl_friday/work_queue.rb, line 138
def start
  @supervisor = Actor.spawn do
    Thread.current[:label] = "#{name}-supervisor"
    supervisor = Actor.current
    @work_loop = Proc.new do
      Thread.current[:label] = "#{name}-worker"
      while running? do
        work = Actor.receive
        if running?
          result = @processor.call(work.msg)
          work.callback.call(result) if work.callback
          supervisor << Ready[Actor.current]
        end
      end
    end

    Actor.trap_exit = true
    begin
      supervisor_loop
    rescue Exception => ex
      $stderr.print "Fatal error in girl_friday: supervisor for #{name} died.\n"
      $stderr.print("#{ex}\n")
      $stderr.print("#{ex.backtrace.join("\n")}\n")
    end
  end
end
supervisor_loop() click to toggle source
# File lib/girl_friday/work_queue.rb, line 175
def supervisor_loop
  loop do
    Actor.receive do |f|
      f.when(Ready) do |who|
        on_ready(who)
      end
      f.when(Work) do |work|
        on_work(work)
      end
      f.when(Shutdown) do |stop|
        @shutting_down = true
        if !working?
          @shutdown = true
          @when_shutdown = stop.callback
          @busy_workers.each { |w| w << stop }
          ready_workers.each { |w| w << stop }
          shutdown_complete
          GirlFriday.remove_queue @weakref
          return
        else
          Thread.pass
          shutdown(&stop.callback)
        end
      end
      f.when(Actor::DeadActorError) do |ex|
        if running?
          # TODO Provide current message contents as error context
          @total_errors += 1
          @busy_workers.delete(ex.actor)
          ready_workers << Actor.spawn_link(&@work_loop)
          handle_error(ex.reason)
          drain
        end
      end
    end
  end
end