class GBDispatch::Runner
Attributes
pool_size[RW]
Public Class Methods
execute(block, options=Hash.new)
click to toggle source
Execute given block. If there is an exception thrown, it log it and crash actor. For more information about error handling, check Celluloid documentation. @param block [Proc] @param options [Hash] @option options [String] :name queue name used for debugging and better logging.
# File lib/gb_dispatch/runner.rb, line 31 def execute(block, options=Hash.new) if defined?(Rails) && defined?(Rails::VERSION::MAJOR) if Rails::VERSION::MAJOR < 5 _execute(block, options) else _execute_rails(block, options) end else _execute(block, options) end end
pool()
click to toggle source
Thread pool for async execution. Pool size is set by default to core numbers or at least 2. You can increase size of pool by setting :pool_size variable before using pool. @return [Concurrent::ThreadPoolExecutor]
# File lib/gb_dispatch/runner.rb, line 11 def pool unless @pool self.pool_size ||=2 threads = [self.pool_size, 2, Concurrent.processor_count].max @pool = Concurrent::ThreadPoolExecutor.new( min_threads: 2, max_threads: threads, max_queue: 10*threads, fallback_policy: :caller_runs ) end @pool end
Private Class Methods
_execute(block, options)
click to toggle source
# File lib/gb_dispatch/runner.rb, line 45 def _execute(block, options) future = Concurrent::Future.new(:executor => self.pool) do _run_block(block, options) end future.execute future.value if future.rejected? raise future.reason end future.value end
_execute_rails(block, options)
click to toggle source
# File lib/gb_dispatch/runner.rb, line 57 def _execute_rails(block, options) if defined?(Rails) && defined?(ActiveSupport::Dependencies) future = Concurrent::Future.new(:executor => self.pool) do Rails.application.executor.wrap do ActiveSupport::Dependencies.interlock.permit_concurrent_loads do _run_block(block, options) end end end future.execute ActiveSupport::Dependencies.interlock.permit_concurrent_loads { future.value } if future.rejected? raise future.reason end future.value else raise 'Failed loading rails!' end end
_run_block(block, options)
click to toggle source
# File lib/gb_dispatch/runner.rb, line 77 def _run_block(block, options) begin name = options[:name] Thread.current[:name] ||= name if name result = block.call result rescue Exception => e if defined?(Raven) Raven.tags_context :gb_dispacth => true Raven.extra_context :dispatch_queue => name Raven.capture_exception(e) end GBDispatch.logger.error "Failed execution of queue #{name} with error #{e.message}" raise e end end