class ActiveEncode::EngineAdapters::MatterhornAdapter

Constants

DEFAULT_ARGS

Public Instance Methods

cancel(encode) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 26
def cancel(encode)
  workflow_om = Rubyhorn.client.stop(encode.id)
  build_encode(get_workflow(workflow_om), encode.class)
end
create(encode) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 8
def create(encode)
  workflow_id = encode.options[:preset] || "full"
  workflow_om = if encode.input.is_a? Hash
    createMultipleFiles(encode.input, workflow_id)
  else
    Rubyhorn.client.addMediaPackageWithUrl(DEFAULT_ARGS.merge({'workflow' => workflow_id, 'url' => encode.input, 'filename' => File.basename(encode.input), 'title' => File.basename(encode.input)}))
  end
  build_encode(get_workflow(workflow_om), encode.class)
end
find(id, opts = {}) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 18
def find(id, opts = {})
  build_encode(fetch_workflow(id), opts[:cast])
end
list(*filters) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 22
def list(*filters)
  raise NotImplementedError #TODO implement this
end
purge(encode) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 31
def purge(encode)
  workflow_om = Rubyhorn.client.stop(encode.id) rescue nil
  workflow_om ||= Rubyhorn.client.get_stopped_workflow(encode.id) rescue nil
  purged_workflow = purge_outputs(get_workflow(workflow_om))
 #Rubyhorn.client.delete_instance(encode.id) #Delete is not working so workflow instances can always be retrieved later!
  build_encode(purged_workflow, encode.class)
end
remove_output(encode, output_id) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 39
def remove_output(encode, output_id)
  workflow = fetch_workflow(encode.id)
  output = encode.output.find {|o| o[:id] == output_id}
  return if output.nil?
  purge_output(workflow, output_id)
  output
end

Private Instance Methods

build_encode(workflow, cast) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 73
def build_encode(workflow, cast)
  return nil if workflow.nil?
  encode = cast.new(convert_input(workflow), convert_options(workflow))
  encode.id = convert_id(workflow)
  encode.state = convert_state(workflow)
  encode.current_operations = convert_current_operations(workflow)
  encode.percent_complete = calculate_percent_complete(workflow)
  encode.output = convert_output(workflow, encode.options)
  encode.errors = convert_errors(workflow)
  encode.tech_metadata = convert_tech_metadata(workflow)
  encode
end
calculate_percent_complete(workflow) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 201
def calculate_percent_complete workflow
  totals = {
    :transcode => 70,
    :distribution => 20,
    :other => 10
  }

  completed_transcode_operations = workflow.xpath('//operation[@id="compose" and (@state="SUCCEEDED" or @state="SKIPPED")]').size
  total_transcode_operations = workflow.xpath('//operation[@id="compose"]').size
  total_transcode_operations = 1 if total_transcode_operations == 0
  completed_distribution_operations = workflow.xpath('//operation[starts-with(@id,"distribute") and (@state="SUCCEEDED" or @state="SKIPPED")]').size
  total_distribution_operations = workflow.xpath('//operation[starts-with(@id,"distribute")]').size
  total_distribution_operations = 1 if total_distribution_operations == 0
  completed_other_operations = workflow.xpath('//operation[@id!="compose" and not(starts-with(@id,"distribute")) and (@state="SUCCEEDED" or @state="SKIPPED")]').size 
  total_other_operations = workflow.xpath('//operation[@id!="compose" and not(starts-with(@id,"distribute"))]').size
  total_other_operations = 1 if total_other_operations == 0

  ((totals[:transcode].to_f / total_transcode_operations) * completed_transcode_operations) +
  ((totals[:distribution].to_f / total_distribution_operations) * completed_distribution_operations) +
  ((totals[:other].to_f / total_other_operations) * completed_other_operations)
end
convert_current_operations(workflow) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 126
def convert_current_operations(workflow)
  current_op = workflow.xpath('//operation[@state!="INSTANTIATED"]/@description').last.to_s
  current_op.present? ? [current_op] : []
end
convert_errors(workflow) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 131
def convert_errors(workflow)
  workflow.xpath('//errors/error/text()').map(&:to_s)
end
convert_id(workflow) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 86
def convert_id(workflow)
  workflow.attribute('id').to_s
end
convert_input(workflow) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 103
def convert_input(workflow)
  #Need to do anything else since this is a MH url? and this disappears when a workflow is cleaned up
  workflow.xpath('mediapackage/media/track[@type="presenter/source"]/url/text()').to_s
end
convert_options(workflow) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 135
def convert_options(workflow)
  options = {}
  options[:preset] = workflow.xpath('template/text()').to_s
  options[:stream_base] = workflow.xpath('//properties/property[@key="avalon.stream_base"]/text()').to_s if workflow.xpath('//properties/property[@key="avalon.stream_base"]/text()').present? #this is avalon-felix specific
  options
end
convert_output(workflow, options) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 112
def convert_output(workflow, options)
  output = []
  workflow.xpath('//track[@type="presenter/delivery" and tags/tag[text()="streaming"]]').each do |track|
    label = track.xpath('tags/tag[starts-with(text(),"quality")]/text()').to_s
    url = track.at("url/text()").to_s
    if url.start_with? "rtmp"
      url = File.join(options[:stream_base], MatterhornRtmpUrl.parse(url).to_path) if options[:stream_base]
    end
    track_id = track.at("@id").to_s
    output << convert_track_metadata(track).merge({id: track_id, url: url, label: label})
  end
  output
end
convert_state(workflow) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 90
def convert_state(workflow)
  case workflow.attribute('state').to_s
  when "INSTANTIATED", "RUNNING" #Should there be a queued state?
    :running
  when "STOPPED"
    :cancelled
  when "FAILED"
    workflow.xpath('//operation[@state="FAILED"]').empty? ? :cancelled : :failed
  when "SUCCEEDED", "SKIPPED" #Should there be a errored state?
    :completed
  end
end
convert_tech_metadata(workflow) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 108
def convert_tech_metadata(workflow)
  convert_track_metadata(workflow.xpath('//track[@type="presenter/source"]').first)
end
convert_track_metadata(track) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 142
def convert_track_metadata(track)
  return {} if track.nil?
  metadata = {}
  metadata[:mime_type] = track.at("mimetype/text()").to_s if track.at('mimetype')
  metadata[:checksum] = track.at("checksum/text()").to_s if track.at('checksum')
  metadata[:duration] = track.at("duration/text()").to_s if track.at('duration')
  if track.at('audio')
    metadata[:audio_codec] = track.at("audio/encoder/@type").to_s 
    metadata[:audio_channels] = track.at("audio/channels/text()").to_s
    metadata[:audio_bitrate] = track.at("audio/bitrate/text()").to_s
  end
  if track.at('video')
    metadata[:video_codec] = track.at("video/encoder/@type").to_s
    metadata[:video_bitrate] = track.at("video/bitrate/text()").to_s
    metadata[:video_framerate] = track.at("video/framerate/text()").to_s
    metadata[:width] = track.at("video/resolution/text()").to_s.split('x')[0]
    metadata[:height] = track.at("video/resolution/text()").to_s.split('x')[1]
  end
  metadata
end
createMultipleFiles(input, workflow_id) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 223
def createMultipleFiles(input, workflow_id)
  #Create empty media package xml document
  mp = Rubyhorn.client.createMediaPackage

  #Next line associates workflow title to avalon via masterfile pid
  title = File.basename(input.values.first)
  dc = Nokogiri::XML('<dublincore xmlns="http://www.opencastproject.org/xsd/1.0/dublincore/" xmlns:dcterms="http://purl.org/dc/terms/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><dcterms:title>' + title + '</dcterms:title></dublincore>')
  mp = Rubyhorn.client.addDCCatalog({'mediaPackage' => mp.to_xml, 'dublinCore' => dc.to_xml, 'flavor' => 'dublincore/episode'})

  #Add quality levels - repeated for each supplied file url
  input.each_pair do |quality, url|
    mp = Rubyhorn.client.addTrack({'mediaPackage' => mp.to_xml, 'url' => url, 'flavor' => DEFAULT_ARGS['flavor']})
    #Rewrite track to include quality tag
    #Get the empty tags element under the newly added track
    tags = mp.xpath('//xmlns:track/xmlns:tags[not(node())]', 'xmlns' => 'http://mediapackage.opencastproject.org').first
    qualityTag = Nokogiri::XML::Node.new 'tag', mp
    qualityTag.content = quality
    tags.add_child qualityTag
  end
  #Finally ingest the media package
  begin
    Rubyhorn.client.start({"definitionId" => workflow_id, "mediapackage" => mp.to_xml})
  rescue Rubyhorn::RestClient::Exceptions::HTTPBadRequest
    #make this two calls...one to get the workflow definition xml and then the second to submit it along with the mediapackage to start...due to unsolved issue with some MH installs
    begin
      workflow_definition_xml = Rubyhorn.client.definition_xml(workflow_id)
      Rubyhorn.client.start({"definition" => workflow_definition_xml, "mediapackage" => mp.to_xml})
    rescue Rubyhorn::RestClient::Exceptions::HTTPNotFound
      raise StandardError.new("Unable to start workflow")
    end
  end
end
fetch_workflow(id) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 48
def fetch_workflow(id)
  workflow_om = begin
    Rubyhorn.client.instance_xml(id)
  rescue Rubyhorn::RestClient::Exceptions::HTTPNotFound
    nil
  end

  workflow_om ||= begin
    Rubyhorn.client.get_stopped_workflow(id)
  rescue
    nil
  end

  get_workflow(workflow_om)
end
get_media_package(workflow) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 163
def get_media_package(workflow)
  mp = workflow.xpath('//mediapackage')
  first_node = mp.first
  first_node['xmlns'] = 'http://mediapackage.opencastproject.org'
  mp
end
get_workflow(workflow_om) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 64
def get_workflow(workflow_om)
  return nil if workflow_om.nil?
  if workflow_om.ng_xml.is_a? Nokogiri::XML::Document
    workflow_om.ng_xml.remove_namespaces!.root
  else
    workflow_om.ng_xml
  end
end
purge_output(workflow, track_id) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 182
def purge_output(workflow, track_id)
  media_package = get_media_package(workflow)
  hls = workflow.xpath("//track[@id='#{track_id}']/tags/tag[text()='hls']").present?
  job_url = if hls
    Rubyhorn.client.delete_hls_track(media_package, track_id)
  else
    Rubyhorn.client.delete_track(media_package, track_id)
  end
  sleep(0.1)
  job_status = Nokogiri::XML(Rubyhorn.client.get(URI(job_url).path)).root.attribute("status").value()
  #FIXME have this return a boolean based upon result of operation
  case job_status
  when "FINISHED"
    workflow.at_xpath("//track[@id=\"#{track_id}\"]").remove
  when "FAILED"
    workflow.at_xpath('//errors').add_child("<error>Output not purged: #{mp.at_xpath("//*[@id=\"#{track_id}\"]/tags/tag[starts-with(text(),\"quality\")]/text()").to_s}</error>")
  end
end
purge_outputs(workflow) click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 170
def purge_outputs(workflow)
  #Delete hls tracks first since the next, more general xpath matches them as well
  workflow.xpath('//track[@type="presenter/delivery" and tags/tag[text()="streaming"] and tags/tag[text()="hls"]]/@id').map(&:to_s).each do |hls_track_id|
    purge_output(workflow, hls_track_id) rescue nil
  end
  workflow.xpath('//track[@type="presenter/delivery" and tags/tag[text()="streaming"]]/@id').map(&:to_s).each do |track_id|
    purge_output(workflow, track_id) rescue nil
  end

  workflow
end