class TavernaPlayer::Worker

Constants

INTERACTION_REGEX

How to get the interaction presentation frame out of the interaction page.

Attributes

run[R]

Public Class Methods

new(run, workflow_file = nil) click to toggle source

The workflow file to be run can be specified explicitly or it will be taken from the workflow model.

# File lib/taverna_player/worker.rb, line 25
def initialize(run, workflow_file = nil)
  @run = run
  @workflow = workflow_file || TavernaPlayer.workflow_proxy.file(@run.workflow)
  @server = "Run not yet initialized"
end

Public Instance Methods

max_attempts() click to toggle source

This tells delayed_job to only try and complete each run once.

# File lib/taverna_player/worker.rb, line 38
def max_attempts
  1
end
perform() click to toggle source
# File lib/taverna_player/worker.rb, line 42
def perform
  return unless run_callback(TavernaPlayer.pre_run_callback, "pre-callback")

  status_message("connect")

  @server = URI.parse(ENV["TAVERNA_URI"] || TavernaPlayer.server_address)
  credentials = server_credentials
  conn_params = TavernaPlayer.server_connection

  begin
    server = T2Server::Server.new(@server, conn_params)
    workflow = File.read(@workflow)
    run = create_run(server, workflow, credentials)

    # If run is nil here then we could have failed or have been cancelled.
    return if run.nil?

    status_message("initialized")

    @run.run_id = run.id
    @run.state = run.status
    @run.create_time = run.create_time
    @run.save

    unless @run.inputs.size == 0
      status_message("inputs")
      @run.inputs.each do |input|
        unless input.file.blank?
          run.input_port(input.name).file = input.file.path
        else
          run.input_port(input.name).value = input.value
        end
      end
    end

    # Just add in all service credentials right now
    TavernaPlayer::ServiceCredential.all.each do |cred|
      run.add_password_credential(cred.uri, cred.login, cred.password)
    end

    status_message("start")
    run.name = @run.name

    # Try and start the run bearing in mind that the server might be at
    # the limit of runs that it can run at once.
    while !run.start
      status_message("busy")

      if cancelled?
        cancel(run)
        return
      end

      sleep(TavernaPlayer.server_retry_interval)
    end

    @run.state = run.status
    @run.start_time = run.start_time
    @run.save

    status_message("running")
    until run.finished?
      sleep(TavernaPlayer.server_poll_interval)
      waiting = false

      if cancelled?
        cancel(run)
        return
      end

      waiting = interactions(run, credentials)

      status_message(waiting ? "interact" : "running")
    end

    status_message("outputs")
    download_outputs(run)
    download_log(run)

    @run.outputs = process_outputs(run)
    @run.finish_time = run.finish_time
    @run.save

    run.delete
  rescue => exception
    failed(exception, run)
    return
  end

  return unless run_callback(TavernaPlayer.post_run_callback, "post-callback")

  @run.state = :finished
  status_message("finished")
end
server() click to toggle source

Return the server address that this worker is using. Used mainly for testing.

# File lib/taverna_player/worker.rb, line 33
def server
  @server.to_s
end

Private Instance Methods

cancel(run = nil) click to toggle source
# File lib/taverna_player/worker.rb, line 352
def cancel(run = nil)
  status_message("cancel")

  unless run.nil?
    download_log(run)
    run.delete
  end

  return unless run_callback(TavernaPlayer.run_cancelled_callback, "cancel-callback")

  @run.state = :cancelled
  @run.finish_time = Time.now
  status_message("cancelled")
end
cancelled?() click to toggle source
# File lib/taverna_player/worker.rb, line 345
def cancelled?
  # Need to poll for updates as the run instance may have been
  # changed in the Rails app.
  @run.reload
  @run.stop
end
create_run(server, workflow, credentials) click to toggle source

Try and create the run bearing in mind that the server might be at the limit of runs that it can hold at once.

# File lib/taverna_player/worker.rb, line 155
def create_run(server, workflow, credentials)
  retries ||= TavernaPlayer.server_connection_error_retries
  server.create_run(workflow, credentials)
rescue T2Server::ServerAtCapacityError
  status_message("full")

  if cancelled?
    cancel
    return
  end

  sleep(TavernaPlayer.server_retry_interval)
  retry
rescue T2Server::ConnectionError => ce
  status_message("network-error")

  if cancelled?
    cancel
    return
  end

  sleep(TavernaPlayer.server_retry_interval)
  unless retries.zero?
    retries -= 1
    retry
  end

  # If we're out of retries, fail the run.
  failed(ce)
end
download_log(run) click to toggle source
# File lib/taverna_player/worker.rb, line 272
def download_log(run)
  Dir.mktmpdir(run.id, Rails.root.join("tmp")) do |tmp_dir|
    tmp_file_name = File.join(tmp_dir, "log.txt")
    begin
      # Only save the log file if it's not empty so as not to confuse
      # Paperclip
      unless run.log(tmp_file_name) == 0
        @run.log = File.new(tmp_file_name)
        @run.save
      end
    rescue T2Server::AttributeNotFoundError
      # We don't care if there's no log but we do want to catch the error!
    end
  end
end
download_outputs(run) click to toggle source
# File lib/taverna_player/worker.rb, line 288
def download_outputs(run)
  Dir.mktmpdir(run.id, Rails.root.join("tmp")) do |tmp_dir|
    tmp_file_name = File.join(tmp_dir, "all.zip")
    run.zip_output(tmp_file_name)
    @run.results = File.new(tmp_file_name)
    @run.save
  end
end
failed(exception, run = nil) click to toggle source
# File lib/taverna_player/worker.rb, line 367
def failed(exception, run = nil)
  begin
    unless run.nil?
      download_log(run)
      run.delete
    end
  rescue
    # Try and grab the log then delete the run from Taverna Server here,
    # but at this point we don't care if we fail...
  end

  unless TavernaPlayer.run_failed_callback.nil?
    status_message("fail-callback")

    begin
      callback(TavernaPlayer.run_failed_callback, @run)
    rescue
      # Again, nothing we can really do here, so...
    end
  end

  backtrace = exception.backtrace.join("\n")
  @run.failure_message = "#{exception.message}\n#{backtrace}"

  @run.finish_time = Time.now

  state = exception.instance_of?(Delayed::WorkerTimeout) ? :timeout : :failed
  @run.state = state
  status_message(state.to_s)
end
interactions(run, credentials) click to toggle source
# File lib/taverna_player/worker.rb, line 186
def interactions(run, credentials)
  wait = false

  run.notifications(:requests).each do |note|
    if @run.has_parent?
      next if note.has_reply? || note.is_notification?

      int = Interaction.find_by_run_id_and_serial(@run.parent_id, note.serial)
      new_int = Interaction.find_or_initialize_by_run_id_and_serial(@run.id, note.serial)

      if new_int.new_record?
        note.reply(int.feed_reply, int.data)
        new_int.displayed = true
        new_int.replied = true
        new_int.feed_reply = int.feed_reply
        new_int.data = int.data
        new_int.save
      end
    else
      int = Interaction.find_or_initialize_by_run_id_and_serial(@run.id, note.serial)

      # Need to catch this here in case some other process has replied.
      if note.has_reply? && !int.replied?
        int.replied = true
        int.save
      end

      unless int.replied?
        if int.page.blank?
          server = run.server
          page = server.read(note.uri, "text/html", credentials)

          INTERACTION_REGEX.match(page) do
            page_uri = $1

            if page_uri.starts_with?(server.uri.to_s)
              page = server.read(URI.parse(page_uri), "text/html", credentials)
              int.page = page.gsub("#{run.interactions_uri.to_s}/pmrpc.js",
              "/assets/taverna_player/application.js")
            else
              int.page_uri = page_uri
            end
          end
        end

        # If this is a pure notification that we've already seen then
        # set it as replied as we don't want it blocking a proper
        # interaction.
        int.replied = true if note.is_notification? && !int.new_record?

        if int.data.blank?
          int.data = note.input_data.force_encoding("UTF-8")
        end

        if !int.feed_reply.blank? && !int.data.blank?
          note.reply(int.feed_reply, int.data)
          int.replied = true
        end

        int.save
      end

      wait = true unless int.replied?
    end
  end

  wait
end
process_outputs(run) click to toggle source
# File lib/taverna_player/worker.rb, line 297
def process_outputs(run)
  outputs = []

  Dir.mktmpdir(run.id, Rails.root.join("tmp")) do |tmp_dir|
    run.output_ports.each_value do |port|
      output = TavernaPlayer::RunPort::Output.new(:name => port.name,
        :depth => port.depth)

      tmp_file_name = File.join(tmp_dir, port.name)

      if port.depth == 0
        if port.type =~ /text/
          output.value = read_file_from_zip(@run.results.path, port.name)
        else
          port_name = port.error? ? "#{port.name}.error" : port.name
          output.file = singleton_output(port_name, tmp_file_name)
        end
      else
        # TODO: Need to rework this so it's not just re-downloading the
        # whole port again. This is the quickest way right now though.
        port.zip("#{tmp_file_name}.zip")
        output.file = File.new("#{tmp_file_name}.zip")
      end

      # Set the output port size and type metadata.
      output.value_type = port.type
      output.value_size = port.size

      outputs << output
    end
  end

  outputs
end
run_callback(cb, message) click to toggle source

Run the specified callback and return false on error so that we know to return out of the worker code completely.

# File lib/taverna_player/worker.rb, line 257
def run_callback(cb, message)
  unless cb.nil?
    status_message(message)
    begin
      callback(cb, @run)
    rescue => exception
      failed(exception)
      return false
    end
  end

  # no errors
  true
end
server_credentials() click to toggle source

Get the credentials for the server

# File lib/taverna_player/worker.rb, line 140
def server_credentials
  creds = ENV["TAVERNA_CREDENTIALS"]

  if creds.nil?
    user = TavernaPlayer.server_username
    pass = TavernaPlayer.server_password
  else
    user, pass = creds.split(':')
  end

  T2Server::HttpBasic.new(user, pass)
end
singleton_output(name, tmp_file) click to toggle source
# File lib/taverna_player/worker.rb, line 332
def singleton_output(name, tmp_file)
  File.open(tmp_file, "wb") do |file|
    file.write read_file_from_zip(@run.results.path, name)
  end

  File.new(tmp_file)
end
status_message(key) click to toggle source
# File lib/taverna_player/worker.rb, line 340
def status_message(key)
  @run.status_message_key = key
  @run.save
end