class Wayfarer::Processor

Runs jobs.

Attributes

job[R]

Public Class Methods

new(job, frontier, dispatcher) click to toggle source
# File lib/wayfarer/processor.rb, line 21
def initialize(job, frontier, dispatcher)
  @job = job
  @frontier = frontier
  @dispatcher = dispatcher
  @halted = Concurrent::AtomicBoolean.new(false)
end

Public Instance Methods

halt!() click to toggle source

Sets a halt flag.

# File lib/wayfarer/processor.rb, line 35
def halt!
  @halted.make_true
end
halted?() click to toggle source

Whether processing is done. @return [true, false]

# File lib/wayfarer/processor.rb, line 30
def halted?
  @halted.value
end
run(*_uris) click to toggle source

Runs the job. @param [*Array<URI>, *Array<String>] uris

# File lib/wayfarer/processor.rb, line 41
def run(*_uris)
  notify_observers!(FirstCycle.new(@frontier))

  while @halted.false? && @frontier.cycle
    current_uris = @frontier.current_uris
    queue = current_uris.inject(Queue.new, :push)

    notify_observers!(NewCycle.new(current_uris.count))

    @threads = Array.new(config.connection_count) do
      Thread.new do
        begin
          loop do
            uri = queue.pop(true)
            break if uri.nil? || @halted.true?
            handle_dispatch_result(@dispatcher.dispatch(@job, uri))
          end
        rescue ThreadError
          notify_observers!(CycleFinished.new)
        end
      end
    end

    @threads.each(&:join)

    notify_observers!(AboutToCycle.new(@frontier.staged_uris.count))
  end
ensure
  halt!
  @frontier.free
  @dispatcher.adapter_pool.free
end

Private Instance Methods

handle_dispatch_result(result) click to toggle source
# File lib/wayfarer/processor.rb, line 76
def handle_dispatch_result(result)
  case result
  when Dispatcher::Mismatch then handle_mismatch(result)
  when Dispatcher::Halt     then handle_halt(result)
  when Dispatcher::Stage    then handle_stage(result)
  when Dispatcher::Error    then handle_error(result)
  end
end
handle_error(error) click to toggle source
# File lib/wayfarer/processor.rb, line 99
def handle_error(error)
  notify_observers!(UnhandledError.new(error.exception))
end
handle_halt(halt) click to toggle source
# File lib/wayfarer/processor.rb, line 89
def handle_halt(halt)
  notify_observers!(HaltInitiated.new(halt.action, halt.uri))
  halt!
end
handle_mismatch(mismatch) click to toggle source
# File lib/wayfarer/processor.rb, line 85
def handle_mismatch(mismatch)
  notify_observers!(MismatchedURI.new(mismatch.uri))
end
handle_stage(stage) click to toggle source
# File lib/wayfarer/processor.rb, line 94
def handle_stage(stage)
  notify_observers!(StagingURIs.new(stage.uris.count))
  @frontier.stage(*stage.uris) unless halted?
end