class Enumerator::Async

Constants

EOQ

Public Class Methods

new(enum, pool_size = nil) click to toggle source
# File lib/enumerator/async.rb, line 9
def initialize(enum, pool_size = nil)
  pool_size = (pool_size || enum.count).to_i
  unless pool_size >= 1
    message = "Thread pool size is invalid! Expected a positive integer but got: #{pool_size}"
    raise ArgumentError, message
  end

  @enum, @pool_size = enum, pool_size
end

Public Instance Methods

each() { |item| ... } click to toggle source
# File lib/enumerator/async.rb, line 32
def each
  raise_error(:each) unless block_given?
  
  queue = SizedQueue.new @pool_size
          
  threads = @pool_size.times.map do
    Thread.new do
      loop do
        item = queue.pop
        item != EOQ ? yield(item) : break
      end
    end
  end
  
  begin
    loop { queue.push @enum.next }
  rescue StopIteration
  ensure
    @pool_size.times { queue.push EOQ }
  end

  threads.each(&:join)
  @enum.rewind
  self
end
map() { |item| ... } click to toggle source
# File lib/enumerator/async.rb, line 68
def map
  raise_error(:map) unless block_given?
  
  [].tap do |outs|
    with_index do |item, index|
      outs[index] = yield(item)
    end
  end
end
size() click to toggle source
# File lib/enumerator/async.rb, line 28
def size
  @enum.size
end
sync() click to toggle source
# File lib/enumerator/async.rb, line 23
def sync
  @enum
end
Also aliased as: to_enum
to_a() click to toggle source
# File lib/enumerator/async.rb, line 19
def to_a
  @enum.to_a
end
to_enum()
Alias for: sync
with_index(start = 0, &work) click to toggle source
# File lib/enumerator/async.rb, line 58
def with_index(start = 0, &work)
  @enum = @enum.with_index(start)
  block_given? ? each(&work) : self
end
with_object(object, &work) click to toggle source
# File lib/enumerator/async.rb, line 63
def with_object(object, &work)
  @enum = @enum.with_object(object)
  block_given? ? (each(&work) and object) : self
end

Private Instance Methods

raise_error(method) click to toggle source
# File lib/enumerator/async.rb, line 80
def raise_error(method)
  raise ArgumentError, "Tried to call async #{method} without a block"
end