class InThreads
Run Enumerable methods with blocks in threads
Constants
- INCOMPATIBLE_METHODS
Attributes
enumerable[R]
thread_count[R]
Public Class Methods
new(enumerable, thread_count = 10, &block)
click to toggle source
Calls superclass method
# File lib/in_threads.rb, line 36 def initialize(enumerable, thread_count = 10, &block) super(enumerable) @enumerable, @thread_count = enumerable, thread_count.to_i unless enumerable.is_a?(Enumerable) fail ArgumentError, '`enumerable` should include Enumerable.' end if thread_count < 2 fail ArgumentError, '`thread_count` can\'t be less than 2.' end each(&block) if block end
use(runner, options)
click to toggle source
Specify runner to use
use :run_in_threads_use_block_result, :for => %w[all? any? none? one?]
`:for` is required `:ignore_undefined` ignores methods which are not present in `Enumerable.instance_methods`
# File lib/in_threads.rb, line 62 def use(runner, options) methods = Array(options[:for]) fail 'no methods provided using :for option' if methods.empty? ignore_undefined = options[:ignore_undefined] methods.each do |method| next if ignore_undefined && !enumerable_method?(method) class_eval <<-RUBY, __FILE__, __LINE__ + 1 def #{method}(*args, &block) if block #{runner}(:#{method}, *args, &block) else enumerable.#{method}(*args) end end RUBY end end
Private Class Methods
enumerable_method?(name)
click to toggle source
# File lib/in_threads.rb, line 84 def enumerable_method?(name) Enumerable.method_defined?(name) end
Public Instance Methods
grep(*args, &block)
click to toggle source
Special case method, works by applying `run_in_threads_use_block_result` with map on enumerable returned by blockless run
# File lib/in_threads.rb, line 123 def grep(*args, &block) if block self.class.new(enumerable.grep(*args), thread_count).map(&block) else enumerable.grep(*args) end end
grep_v(*args, &block)
click to toggle source
Special case method, works by applying `run_in_threads_use_block_result` with map on enumerable returned by blockless run
# File lib/in_threads.rb, line 134 def grep_v(*args, &block) if block self.class.new(enumerable.grep_v(*args), thread_count).map(&block) else enumerable.grep_v(*args) end end
in_threads(thread_count = 10, &block)
click to toggle source
Creates new instance using underlying enumerable and new thread_count
# File lib/in_threads.rb, line 50 def in_threads(thread_count = 10, &block) self.class.new(enumerable, thread_count, &block) end
with_progress(title = nil, length = nil, &block)
click to toggle source
befriend with progress gem
# File lib/in_threads.rb, line 144 def with_progress(title = nil, length = nil, &block) ::Progress::WithProgress.new(self, title, length, &block) end
Protected Instance Methods
run_in_threads_ignore_block_result(method, *args, &block)
click to toggle source
Use for methods which don't use block result
# File lib/in_threads.rb, line 220 def run_in_threads_ignore_block_result(method, *args, &block) pool = Pool.new(thread_count) wait = SizedQueue.new(thread_count - 1) begin pool.catch do enumerable.send(method, *args) do |*block_args| pool.run do pool.catch do block.call(*block_args) end wait.pop end wait.push(nil) break if pool.stop? end end ensure pool.finalize if (e = pool.exception) return e.exit_value if e.is_a?(LocalJumpError) && e.reason == :break fail e end end end
run_in_threads_use_block_result(method, *args, &block)
click to toggle source
Use for methods which do use block result
# File lib/in_threads.rb, line 247 def run_in_threads_use_block_result(method, *args, &block) pool = Pool.new(thread_count) enum_a = QueueEnum.new enum_b = QueueEnum.new(thread_count - 1) results = SizedQueue.new(thread_count - 1) filler = filler_thread(pool, [enum_a, enum_b]) runner = runner_thread(pool, enum_a, results, &block) begin pool.catch do enum_b.send(method, *args) do result = results.pop.pop break if pool.stop? result end end ensure pool.stop! enum_a.close(true) enum_b.close(true) results.clear pool.finalize runner.join filler.join if (e = pool.exception) return e.exit_value if e.is_a?(LocalJumpError) && e.reason == :break fail e end end end
Private Instance Methods
filler_thread(pool, enums)
click to toggle source
# File lib/in_threads.rb, line 282 def filler_thread(pool, enums) Thread.new do pool.catch do enumerable.each do |*block_args| enums.each do |enum| enum.push(*block_args) end break if pool.stop? end end enums.each(&:close) end end
runner_thread(pool, enum, results, &block)
click to toggle source
# File lib/in_threads.rb, line 296 def runner_thread(pool, enum, results, &block) Thread.new do enum.each do |*block_args| queue = Queue.new pool.run do queue.push(pool.catch{ block.call(*block_args) }) end results.push(queue) break if pool.stop? end end end