class InThreads

Run Enumerable methods with blocks in threads

Constants

INCOMPATIBLE_METHODS

Attributes

enumerable[R]
thread_count[R]

Public Class Methods

new(enumerable, thread_count = 10, &block) click to toggle source
Calls superclass method
# File lib/in_threads.rb, line 36
def initialize(enumerable, thread_count = 10, &block)
  super(enumerable)
  @enumerable, @thread_count = enumerable, thread_count.to_i
  unless enumerable.is_a?(Enumerable)
    fail ArgumentError, '`enumerable` should include Enumerable.'
  end
  if thread_count < 2
    fail ArgumentError, '`thread_count` can\'t be less than 2.'
  end

  each(&block) if block
end
use(runner, options) click to toggle source

Specify runner to use

use :run_in_threads_use_block_result, :for => %w[all? any? none? one?]

`:for` is required `:ignore_undefined` ignores methods which are not present in `Enumerable.instance_methods`

# File lib/in_threads.rb, line 62
    def use(runner, options)
      methods = Array(options[:for])
      fail 'no methods provided using :for option' if methods.empty?

      ignore_undefined = options[:ignore_undefined]
      methods.each do |method|
        next if ignore_undefined && !enumerable_method?(method)

        class_eval <<-RUBY, __FILE__, __LINE__ + 1
          def #{method}(*args, &block)
            if block
              #{runner}(:#{method}, *args, &block)
            else
              enumerable.#{method}(*args)
            end
          end
        RUBY
      end
    end

Private Class Methods

enumerable_method?(name) click to toggle source
# File lib/in_threads.rb, line 84
def enumerable_method?(name)
  Enumerable.method_defined?(name)
end

Public Instance Methods

grep(*args, &block) click to toggle source

Special case method, works by applying `run_in_threads_use_block_result` with map on enumerable returned by blockless run

# File lib/in_threads.rb, line 123
def grep(*args, &block)
  if block
    self.class.new(enumerable.grep(*args), thread_count).map(&block)
  else
    enumerable.grep(*args)
  end
end
grep_v(*args, &block) click to toggle source

Special case method, works by applying `run_in_threads_use_block_result` with map on enumerable returned by blockless run

# File lib/in_threads.rb, line 134
def grep_v(*args, &block)
  if block
    self.class.new(enumerable.grep_v(*args), thread_count).map(&block)
  else
    enumerable.grep_v(*args)
  end
end
in_threads(thread_count = 10, &block) click to toggle source

Creates new instance using underlying enumerable and new thread_count

# File lib/in_threads.rb, line 50
def in_threads(thread_count = 10, &block)
  self.class.new(enumerable, thread_count, &block)
end
with_progress(title = nil, length = nil, &block) click to toggle source

befriend with progress gem

# File lib/in_threads.rb, line 144
def with_progress(title = nil, length = nil, &block)
  ::Progress::WithProgress.new(self, title, length, &block)
end

Protected Instance Methods

run_in_threads_ignore_block_result(method, *args, &block) click to toggle source

Use for methods which don't use block result

# File lib/in_threads.rb, line 220
def run_in_threads_ignore_block_result(method, *args, &block)
  pool = Pool.new(thread_count)
  wait = SizedQueue.new(thread_count - 1)
  begin
    pool.catch do
      enumerable.send(method, *args) do |*block_args|
        pool.run do
          pool.catch do
            block.call(*block_args)
          end
          wait.pop
        end
        wait.push(nil)
        break if pool.stop?
      end
    end
  ensure
    pool.finalize
    if (e = pool.exception)
      return e.exit_value if e.is_a?(LocalJumpError) && e.reason == :break

      fail e
    end
  end
end
run_in_threads_use_block_result(method, *args, &block) click to toggle source

Use for methods which do use block result

# File lib/in_threads.rb, line 247
def run_in_threads_use_block_result(method, *args, &block)
  pool = Pool.new(thread_count)
  enum_a = QueueEnum.new
  enum_b = QueueEnum.new(thread_count - 1)
  results = SizedQueue.new(thread_count - 1)
  filler = filler_thread(pool, [enum_a, enum_b])
  runner = runner_thread(pool, enum_a, results, &block)

  begin
    pool.catch do
      enum_b.send(method, *args) do
        result = results.pop.pop
        break if pool.stop?

        result
      end
    end
  ensure
    pool.stop!
    enum_a.close(true)
    enum_b.close(true)
    results.clear
    pool.finalize
    runner.join
    filler.join
    if (e = pool.exception)
      return e.exit_value if e.is_a?(LocalJumpError) && e.reason == :break

      fail e
    end
  end
end

Private Instance Methods

filler_thread(pool, enums) click to toggle source
# File lib/in_threads.rb, line 282
def filler_thread(pool, enums)
  Thread.new do
    pool.catch do
      enumerable.each do |*block_args|
        enums.each do |enum|
          enum.push(*block_args)
        end
        break if pool.stop?
      end
    end
    enums.each(&:close)
  end
end
runner_thread(pool, enum, results, &block) click to toggle source
# File lib/in_threads.rb, line 296
def runner_thread(pool, enum, results, &block)
  Thread.new do
    enum.each do |*block_args|
      queue = Queue.new
      pool.run do
        queue.push(pool.catch{ block.call(*block_args) })
      end
      results.push(queue)
      break if pool.stop?
    end
  end
end