class Volt::Dispatcher
The task dispatcher is responsible for taking incoming messages from the socket channel and dispatching them to the proper handler.
Attributes
volt_app[R]
Public Class Methods
component_last_modified_time()
click to toggle source
# File lib/volt/tasks/dispatcher.rb, line 42 def self.component_last_modified_time unless @last_modified_time component_modified(Time.now.to_i.to_s) end @last_modified_time end
component_modified(time)
click to toggle source
Mark the last time of the component modification for caching in sprockets
# File lib/volt/tasks/dispatcher.rb, line 34 def self.component_modified(time) @last_modified_time = time end
new(volt_app)
click to toggle source
# File lib/volt/tasks/dispatcher.rb, line 16 def initialize(volt_app) @volt_app = volt_app if Volt.env.test? # When testing, we want to run immediately so it blocks and doesn't # start the next thread. @worker_pool = Concurrent::ImmediateExecutor.new else @worker_pool = Concurrent::ThreadPoolExecutor.new( min_threads: Volt.config.min_worker_threads, max_threads: Volt.config.max_worker_threads ) end @worker_timeout = Volt.config.worker_timeout || 60 end
Public Instance Methods
close_channel(channel)
click to toggle source
# File lib/volt/tasks/dispatcher.rb, line 89 def close_channel(channel) QueryTasks.new(@volt_app, channel).close! end
component_modified(time)
click to toggle source
# File lib/volt/tasks/dispatcher.rb, line 38 def component_modified(time) self.class.component_modified(time) end
dispatch(channel, message)
click to toggle source
Dispatch takes an incoming Task
from the client and runs it on the server, returning the result to the client. Tasks
returning a promise will wait to return.
# File lib/volt/tasks/dispatcher.rb, line 53 def dispatch(channel, message) # Dispatch the task in the worker pool. Pas in the meta data @worker_pool.post do begin dispatch_in_thread(channel, message) rescue => e err = "Worker Thread Exception for #{message}\n" err += e.inspect err += e.backtrace.join("\n") if e.respond_to?(:backtrace) Volt.logger.error(err) end end end
safe_method?(klass, method_name)
click to toggle source
Check if it is safe to use this method
# File lib/volt/tasks/dispatcher.rb, line 70 def safe_method?(klass, method_name) # Make sure the class being called is a Task. return false unless klass.ancestors.include?(Task) # Make sure the method is defined on the klass we're using and not up the hiearchy. # ^ This check prevents methods like #send, #eval, #instance_eval, #class_eval, etc... klass.ancestors.each do |ancestor_klass| if ancestor_klass.instance_methods(false).include?(method_name) return true elsif ancestor_klass == Task # We made it to Task and didn't find the method, that means it # was defined above Task, so we reject the call. return false end end false end
Private Instance Methods
dispatch_in_thread(channel, message)
click to toggle source
Do the actual dispatching, should be running inside of a worker thread at this point.
# File lib/volt/tasks/dispatcher.rb, line 97 def dispatch_in_thread(channel, message) callback_id, class_name, method_name, meta_data, *args = message method_name = method_name.to_sym # Get the class klass = Object.send(:const_get, class_name) promise = Promise.new cookies = nil start_time = Time.now.to_f # Check that we are calling on a Task class and a method provide at # Task or above in the ancestor chain. (so no :send or anything) if safe_method?(klass, method_name) promise.resolve(nil) # Init and send the method promise = promise.then do result = nil Timeout.timeout(klass.__timeout || @worker_timeout) do Thread.current['meta'] = meta_data begin klass_inst = klass.new(@volt_app, channel, self) result = klass_inst.send(method_name, *args) cookies = klass_inst.fetch_cookies ensure Thread.current['meta'] = nil end end result end else # Unsafe method promise.reject(RuntimeError.new("unsafe method: #{method_name}")) end # Called after task runs or fails finish = proc do |error| if error.is_a?(Timeout::Error) # re-raise with a message error = Timeout::Error.new("Task Timed Out after #{@worker_timeout} seconds: #{message}") end run_time = ((Time.now.to_f - start_time) * 1000).round(3) Volt.logger.log_dispatch(class_name, method_name, run_time, args, error) end # Run the promise and pass the return value/error back to the client promise.then do |result| reply = EJSON.stringify(['response', callback_id, result, nil, cookies]) channel.send_string_message(reply) finish.call end.fail do |error| begin finish.call(error) begin # Try to send, handle error if we can't convert the result to EJSON reply = EJSON.stringify(['response', callback_id, nil, error, cookies]) # use send_string_message, since we stringify here, not on the other # side of Drb. channel.send_string_message(reply) rescue EJSON::NonEjsonType => e # Convert the error into a string so it can be serialized to # something. error = "#{error.class.to_s}: #{error.to_s}" channel.send_message('response', callback_id, nil, error, cookies) end rescue => e Volt.logger.error "Error in fail dispatch: #{e.inspect}" Volt.logger.error(e.backtrace.join("\n")) if e.respond_to?(:backtrace) raise end end end