class TavernaPlayer::Worker
Constants
- INTERACTION_REGEX
How to get the interaction presentation frame out of the interaction page.
Attributes
run[R]
Public Class Methods
new(run, workflow_file = nil)
click to toggle source
The workflow file to be run can be specified explicitly or it will be taken from the workflow model.
# File lib/taverna_player/worker.rb, line 25 def initialize(run, workflow_file = nil) @run = run @workflow = workflow_file || TavernaPlayer.workflow_proxy.file(@run.workflow) @server = "Run not yet initialized" end
Public Instance Methods
max_attempts()
click to toggle source
This tells delayed_job to only try and complete each run once.
# File lib/taverna_player/worker.rb, line 38 def max_attempts 1 end
perform()
click to toggle source
# File lib/taverna_player/worker.rb, line 42 def perform return unless run_callback(TavernaPlayer.pre_run_callback, "pre-callback") status_message("connect") @server = URI.parse(ENV["TAVERNA_URI"] || TavernaPlayer.server_address) credentials = server_credentials conn_params = TavernaPlayer.server_connection begin server = T2Server::Server.new(@server, conn_params) workflow = File.read(@workflow) run = create_run(server, workflow, credentials) # If run is nil here then we could have failed or have been cancelled. return if run.nil? status_message("initialized") @run.run_id = run.id @run.state = run.status @run.create_time = run.create_time @run.save unless @run.inputs.size == 0 status_message("inputs") @run.inputs.each do |input| unless input.file.blank? run.input_port(input.name).file = input.file.path else run.input_port(input.name).value = input.value end end end # Just add in all service credentials right now TavernaPlayer::ServiceCredential.all.each do |cred| run.add_password_credential(cred.uri, cred.login, cred.password) end status_message("start") run.name = @run.name # Try and start the run bearing in mind that the server might be at # the limit of runs that it can run at once. while !run.start status_message("busy") if cancelled? cancel(run) return end sleep(TavernaPlayer.server_retry_interval) end @run.state = run.status @run.start_time = run.start_time @run.save status_message("running") until run.finished? sleep(TavernaPlayer.server_poll_interval) waiting = false if cancelled? cancel(run) return end waiting = interactions(run, credentials) status_message(waiting ? "interact" : "running") end status_message("outputs") download_outputs(run) download_log(run) @run.outputs = process_outputs(run) @run.finish_time = run.finish_time @run.save run.delete rescue => exception failed(exception, run) return end return unless run_callback(TavernaPlayer.post_run_callback, "post-callback") @run.state = :finished status_message("finished") end
server()
click to toggle source
Return the server address that this worker is using. Used mainly for testing.
# File lib/taverna_player/worker.rb, line 33 def server @server.to_s end
Private Instance Methods
cancel(run = nil)
click to toggle source
# File lib/taverna_player/worker.rb, line 352 def cancel(run = nil) status_message("cancel") unless run.nil? download_log(run) run.delete end return unless run_callback(TavernaPlayer.run_cancelled_callback, "cancel-callback") @run.state = :cancelled @run.finish_time = Time.now status_message("cancelled") end
cancelled?()
click to toggle source
# File lib/taverna_player/worker.rb, line 345 def cancelled? # Need to poll for updates as the run instance may have been # changed in the Rails app. @run.reload @run.stop end
create_run(server, workflow, credentials)
click to toggle source
Try and create the run bearing in mind that the server might be at the limit of runs that it can hold at once.
# File lib/taverna_player/worker.rb, line 155 def create_run(server, workflow, credentials) retries ||= TavernaPlayer.server_connection_error_retries server.create_run(workflow, credentials) rescue T2Server::ServerAtCapacityError status_message("full") if cancelled? cancel return end sleep(TavernaPlayer.server_retry_interval) retry rescue T2Server::ConnectionError => ce status_message("network-error") if cancelled? cancel return end sleep(TavernaPlayer.server_retry_interval) unless retries.zero? retries -= 1 retry end # If we're out of retries, fail the run. failed(ce) end
download_log(run)
click to toggle source
# File lib/taverna_player/worker.rb, line 272 def download_log(run) Dir.mktmpdir(run.id, Rails.root.join("tmp")) do |tmp_dir| tmp_file_name = File.join(tmp_dir, "log.txt") begin # Only save the log file if it's not empty so as not to confuse # Paperclip unless run.log(tmp_file_name) == 0 @run.log = File.new(tmp_file_name) @run.save end rescue T2Server::AttributeNotFoundError # We don't care if there's no log but we do want to catch the error! end end end
download_outputs(run)
click to toggle source
# File lib/taverna_player/worker.rb, line 288 def download_outputs(run) Dir.mktmpdir(run.id, Rails.root.join("tmp")) do |tmp_dir| tmp_file_name = File.join(tmp_dir, "all.zip") run.zip_output(tmp_file_name) @run.results = File.new(tmp_file_name) @run.save end end
failed(exception, run = nil)
click to toggle source
# File lib/taverna_player/worker.rb, line 367 def failed(exception, run = nil) begin unless run.nil? download_log(run) run.delete end rescue # Try and grab the log then delete the run from Taverna Server here, # but at this point we don't care if we fail... end unless TavernaPlayer.run_failed_callback.nil? status_message("fail-callback") begin callback(TavernaPlayer.run_failed_callback, @run) rescue # Again, nothing we can really do here, so... end end backtrace = exception.backtrace.join("\n") @run.failure_message = "#{exception.message}\n#{backtrace}" @run.finish_time = Time.now state = exception.instance_of?(Delayed::WorkerTimeout) ? :timeout : :failed @run.state = state status_message(state.to_s) end
interactions(run, credentials)
click to toggle source
# File lib/taverna_player/worker.rb, line 186 def interactions(run, credentials) wait = false run.notifications(:requests).each do |note| if @run.has_parent? next if note.has_reply? || note.is_notification? int = Interaction.find_by_run_id_and_serial(@run.parent_id, note.serial) new_int = Interaction.find_or_initialize_by_run_id_and_serial(@run.id, note.serial) if new_int.new_record? note.reply(int.feed_reply, int.data) new_int.displayed = true new_int.replied = true new_int.feed_reply = int.feed_reply new_int.data = int.data new_int.save end else int = Interaction.find_or_initialize_by_run_id_and_serial(@run.id, note.serial) # Need to catch this here in case some other process has replied. if note.has_reply? && !int.replied? int.replied = true int.save end unless int.replied? if int.page.blank? server = run.server page = server.read(note.uri, "text/html", credentials) INTERACTION_REGEX.match(page) do page_uri = $1 if page_uri.starts_with?(server.uri.to_s) page = server.read(URI.parse(page_uri), "text/html", credentials) int.page = page.gsub("#{run.interactions_uri.to_s}/pmrpc.js", "/assets/taverna_player/application.js") else int.page_uri = page_uri end end end # If this is a pure notification that we've already seen then # set it as replied as we don't want it blocking a proper # interaction. int.replied = true if note.is_notification? && !int.new_record? if int.data.blank? int.data = note.input_data.force_encoding("UTF-8") end if !int.feed_reply.blank? && !int.data.blank? note.reply(int.feed_reply, int.data) int.replied = true end int.save end wait = true unless int.replied? end end wait end
process_outputs(run)
click to toggle source
# File lib/taverna_player/worker.rb, line 297 def process_outputs(run) outputs = [] Dir.mktmpdir(run.id, Rails.root.join("tmp")) do |tmp_dir| run.output_ports.each_value do |port| output = TavernaPlayer::RunPort::Output.new(:name => port.name, :depth => port.depth) tmp_file_name = File.join(tmp_dir, port.name) if port.depth == 0 if port.type =~ /text/ output.value = read_file_from_zip(@run.results.path, port.name) else port_name = port.error? ? "#{port.name}.error" : port.name output.file = singleton_output(port_name, tmp_file_name) end else # TODO: Need to rework this so it's not just re-downloading the # whole port again. This is the quickest way right now though. port.zip("#{tmp_file_name}.zip") output.file = File.new("#{tmp_file_name}.zip") end # Set the output port size and type metadata. output.value_type = port.type output.value_size = port.size outputs << output end end outputs end
run_callback(cb, message)
click to toggle source
Run
the specified callback and return false on error so that we know to return out of the worker code completely.
# File lib/taverna_player/worker.rb, line 257 def run_callback(cb, message) unless cb.nil? status_message(message) begin callback(cb, @run) rescue => exception failed(exception) return false end end # no errors true end
server_credentials()
click to toggle source
Get the credentials for the server
# File lib/taverna_player/worker.rb, line 140 def server_credentials creds = ENV["TAVERNA_CREDENTIALS"] if creds.nil? user = TavernaPlayer.server_username pass = TavernaPlayer.server_password else user, pass = creds.split(':') end T2Server::HttpBasic.new(user, pass) end
singleton_output(name, tmp_file)
click to toggle source
# File lib/taverna_player/worker.rb, line 332 def singleton_output(name, tmp_file) File.open(tmp_file, "wb") do |file| file.write read_file_from_zip(@run.results.path, name) end File.new(tmp_file) end
status_message(key)
click to toggle source
# File lib/taverna_player/worker.rb, line 340 def status_message(key) @run.status_message_key = key @run.save end