class Processor::ProcessRunner::Threads

Attributes

number_of_threads[R]
threads_created[R]

Public Class Methods

new(number_of_threads = 1) click to toggle source
# File lib/processor/process_runner/threads.rb, line 4
def initialize(number_of_threads = 1)
  @number_of_threads = number_of_threads
  @threads = []
end

Public Instance Methods

call(processor) click to toggle source
# File lib/processor/process_runner/threads.rb, line 9
def call(processor)
  join_threads

  begin
    processor.records.each do |record|
      if threads_created >= number_of_threads then join_threads end

      new_thread(processor, record) do |thread_data_processor, thread_record|
        begin
          thread_data_processor.process(thread_record)
        rescue StandardError => exception
          command = catch(:command) do
            thread_data_processor.record_error thread_record, exception
          end

          # NOTE: redo can not be moved into a method or block
          # to break from records loop, use Exception
          redo if command == :redo
        end
      end

    end
  ensure # join already created threads
    join_threads
  end
end

Private Instance Methods

join_threads() click to toggle source
# File lib/processor/process_runner/threads.rb, line 44
def join_threads
  @threads.each(&:join)
  @threads_created = 0
  @threads = []
end
new_thread(processor, record, &block) click to toggle source
# File lib/processor/process_runner/threads.rb, line 39
def new_thread(processor, record, &block)
  @threads << ::Thread.new(processor, record, &block)
  @threads_created += 1
end