class Volt::Dispatcher

The task dispatcher is responsible for taking incoming messages from the socket channel and dispatching them to the proper handler.

Attributes

volt_app[R]

Public Class Methods

component_last_modified_time() click to toggle source
# File lib/volt/tasks/dispatcher.rb, line 42
def self.component_last_modified_time
  unless @last_modified_time
    component_modified(Time.now.to_i.to_s)
  end

  @last_modified_time
end
component_modified(time) click to toggle source

Mark the last time of the component modification for caching in sprockets

# File lib/volt/tasks/dispatcher.rb, line 34
def self.component_modified(time)
  @last_modified_time = time
end
new(volt_app) click to toggle source
# File lib/volt/tasks/dispatcher.rb, line 16
def initialize(volt_app)
  @volt_app = volt_app

  if Volt.env.test?
    # When testing, we want to run immediately so it blocks and doesn't
    # start the next thread.
    @worker_pool = Concurrent::ImmediateExecutor.new
  else
    @worker_pool = Concurrent::ThreadPoolExecutor.new(
      min_threads: Volt.config.min_worker_threads,
      max_threads: Volt.config.max_worker_threads
    )
  end

  @worker_timeout = Volt.config.worker_timeout || 60
end

Public Instance Methods

close_channel(channel) click to toggle source
# File lib/volt/tasks/dispatcher.rb, line 89
def close_channel(channel)
  QueryTasks.new(@volt_app, channel).close!
end
component_modified(time) click to toggle source
# File lib/volt/tasks/dispatcher.rb, line 38
def component_modified(time)
  self.class.component_modified(time)
end
dispatch(channel, message) click to toggle source

Dispatch takes an incoming Task from the client and runs it on the server, returning the result to the client. Tasks returning a promise will wait to return.

# File lib/volt/tasks/dispatcher.rb, line 53
def dispatch(channel, message)
  # Dispatch the task in the worker pool.  Pas in the meta data
  @worker_pool.post do
    begin
      dispatch_in_thread(channel, message)
    rescue => e
      err = "Worker Thread Exception for #{message}\n"
      err += e.inspect
      err += e.backtrace.join("\n") if e.respond_to?(:backtrace)

      Volt.logger.error(err)
    end
  end
end
safe_method?(klass, method_name) click to toggle source

Check if it is safe to use this method

# File lib/volt/tasks/dispatcher.rb, line 70
def safe_method?(klass, method_name)
  # Make sure the class being called is a Task.
  return false unless klass.ancestors.include?(Task)

  # Make sure the method is defined on the klass we're using and not up the hiearchy.
  #   ^ This check prevents methods like #send, #eval, #instance_eval, #class_eval, etc...
  klass.ancestors.each do |ancestor_klass|
    if ancestor_klass.instance_methods(false).include?(method_name)
      return true
    elsif ancestor_klass == Task
      # We made it to Task and didn't find the method, that means it
      # was defined above Task, so we reject the call.
      return false
    end
  end

  false
end

Private Instance Methods

dispatch_in_thread(channel, message) click to toggle source

Do the actual dispatching, should be running inside of a worker thread at this point.

# File lib/volt/tasks/dispatcher.rb, line 97
def dispatch_in_thread(channel, message)
  callback_id, class_name, method_name, meta_data, *args = message
  method_name = method_name.to_sym

  # Get the class
  klass = Object.send(:const_get, class_name)

  promise = Promise.new
  cookies = nil

  start_time = Time.now.to_f

  # Check that we are calling on a Task class and a method provide at
  # Task or above in the ancestor chain. (so no :send or anything)
  if safe_method?(klass, method_name)
    promise.resolve(nil)

    # Init and send the method
    promise = promise.then do
      result = nil
      Timeout.timeout(klass.__timeout || @worker_timeout) do
        Thread.current['meta'] = meta_data
        begin
          klass_inst = klass.new(@volt_app, channel, self)
          result = klass_inst.send(method_name, *args)
          cookies = klass_inst.fetch_cookies
        ensure
          Thread.current['meta'] = nil
        end
      end

      result
    end

  else
    # Unsafe method
    promise.reject(RuntimeError.new("unsafe method: #{method_name}"))
  end

  # Called after task runs or fails
  finish = proc do |error|
    if error.is_a?(Timeout::Error)
      # re-raise with a message
      error = Timeout::Error.new("Task Timed Out after #{@worker_timeout} seconds: #{message}")
    end

    run_time = ((Time.now.to_f - start_time) * 1000).round(3)
    Volt.logger.log_dispatch(class_name, method_name, run_time, args, error)
  end

  # Run the promise and pass the return value/error back to the client
  promise.then do |result|
    reply = EJSON.stringify(['response', callback_id, result, nil, cookies])
    channel.send_string_message(reply)

    finish.call
  end.fail do |error|
    begin
      finish.call(error)

      begin
        # Try to send, handle error if we can't convert the result to EJSON
        reply = EJSON.stringify(['response', callback_id, nil, error, cookies])

        # use send_string_message, since we stringify here, not on the other
        # side of Drb.
        channel.send_string_message(reply)
      rescue EJSON::NonEjsonType => e
        # Convert the error into a string so it can be serialized to
        # something.
        error = "#{error.class.to_s}: #{error.to_s}"
        channel.send_message('response', callback_id, nil, error, cookies)
      end

    rescue => e
      Volt.logger.error "Error in fail dispatch: #{e.inspect}"
      Volt.logger.error(e.backtrace.join("\n")) if e.respond_to?(:backtrace)
      raise
    end
  end

end