class ActiveEncode::EngineAdapters::MatterhornAdapter
Constants
- DEFAULT_ARGS
Public Instance Methods
cancel(encode)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 26 def cancel(encode) workflow_om = Rubyhorn.client.stop(encode.id) build_encode(get_workflow(workflow_om), encode.class) end
create(encode)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 8 def create(encode) workflow_id = encode.options[:preset] || "full" workflow_om = if encode.input.is_a? Hash createMultipleFiles(encode.input, workflow_id) else Rubyhorn.client.addMediaPackageWithUrl(DEFAULT_ARGS.merge({'workflow' => workflow_id, 'url' => encode.input, 'filename' => File.basename(encode.input), 'title' => File.basename(encode.input)})) end build_encode(get_workflow(workflow_om), encode.class) end
find(id, opts = {})
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 18 def find(id, opts = {}) build_encode(fetch_workflow(id), opts[:cast]) end
list(*filters)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 22 def list(*filters) raise NotImplementedError #TODO implement this end
purge(encode)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 31 def purge(encode) workflow_om = Rubyhorn.client.stop(encode.id) rescue nil workflow_om ||= Rubyhorn.client.get_stopped_workflow(encode.id) rescue nil purged_workflow = purge_outputs(get_workflow(workflow_om)) #Rubyhorn.client.delete_instance(encode.id) #Delete is not working so workflow instances can always be retrieved later! build_encode(purged_workflow, encode.class) end
remove_output(encode, output_id)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 39 def remove_output(encode, output_id) workflow = fetch_workflow(encode.id) output = encode.output.find {|o| o[:id] == output_id} return if output.nil? purge_output(workflow, output_id) output end
Private Instance Methods
build_encode(workflow, cast)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 73 def build_encode(workflow, cast) return nil if workflow.nil? encode = cast.new(convert_input(workflow), convert_options(workflow)) encode.id = convert_id(workflow) encode.state = convert_state(workflow) encode.current_operations = convert_current_operations(workflow) encode.percent_complete = calculate_percent_complete(workflow) encode.output = convert_output(workflow, encode.options) encode.errors = convert_errors(workflow) encode.tech_metadata = convert_tech_metadata(workflow) encode end
calculate_percent_complete(workflow)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 201 def calculate_percent_complete workflow totals = { :transcode => 70, :distribution => 20, :other => 10 } completed_transcode_operations = workflow.xpath('//operation[@id="compose" and (@state="SUCCEEDED" or @state="SKIPPED")]').size total_transcode_operations = workflow.xpath('//operation[@id="compose"]').size total_transcode_operations = 1 if total_transcode_operations == 0 completed_distribution_operations = workflow.xpath('//operation[starts-with(@id,"distribute") and (@state="SUCCEEDED" or @state="SKIPPED")]').size total_distribution_operations = workflow.xpath('//operation[starts-with(@id,"distribute")]').size total_distribution_operations = 1 if total_distribution_operations == 0 completed_other_operations = workflow.xpath('//operation[@id!="compose" and not(starts-with(@id,"distribute")) and (@state="SUCCEEDED" or @state="SKIPPED")]').size total_other_operations = workflow.xpath('//operation[@id!="compose" and not(starts-with(@id,"distribute"))]').size total_other_operations = 1 if total_other_operations == 0 ((totals[:transcode].to_f / total_transcode_operations) * completed_transcode_operations) + ((totals[:distribution].to_f / total_distribution_operations) * completed_distribution_operations) + ((totals[:other].to_f / total_other_operations) * completed_other_operations) end
convert_current_operations(workflow)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 126 def convert_current_operations(workflow) current_op = workflow.xpath('//operation[@state!="INSTANTIATED"]/@description').last.to_s current_op.present? ? [current_op] : [] end
convert_errors(workflow)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 131 def convert_errors(workflow) workflow.xpath('//errors/error/text()').map(&:to_s) end
convert_id(workflow)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 86 def convert_id(workflow) workflow.attribute('id').to_s end
convert_input(workflow)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 103 def convert_input(workflow) #Need to do anything else since this is a MH url? and this disappears when a workflow is cleaned up workflow.xpath('mediapackage/media/track[@type="presenter/source"]/url/text()').to_s end
convert_options(workflow)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 135 def convert_options(workflow) options = {} options[:preset] = workflow.xpath('template/text()').to_s options[:stream_base] = workflow.xpath('//properties/property[@key="avalon.stream_base"]/text()').to_s if workflow.xpath('//properties/property[@key="avalon.stream_base"]/text()').present? #this is avalon-felix specific options end
convert_output(workflow, options)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 112 def convert_output(workflow, options) output = [] workflow.xpath('//track[@type="presenter/delivery" and tags/tag[text()="streaming"]]').each do |track| label = track.xpath('tags/tag[starts-with(text(),"quality")]/text()').to_s url = track.at("url/text()").to_s if url.start_with? "rtmp" url = File.join(options[:stream_base], MatterhornRtmpUrl.parse(url).to_path) if options[:stream_base] end track_id = track.at("@id").to_s output << convert_track_metadata(track).merge({id: track_id, url: url, label: label}) end output end
convert_state(workflow)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 90 def convert_state(workflow) case workflow.attribute('state').to_s when "INSTANTIATED", "RUNNING" #Should there be a queued state? :running when "STOPPED" :cancelled when "FAILED" workflow.xpath('//operation[@state="FAILED"]').empty? ? :cancelled : :failed when "SUCCEEDED", "SKIPPED" #Should there be a errored state? :completed end end
convert_tech_metadata(workflow)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 108 def convert_tech_metadata(workflow) convert_track_metadata(workflow.xpath('//track[@type="presenter/source"]').first) end
convert_track_metadata(track)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 142 def convert_track_metadata(track) return {} if track.nil? metadata = {} metadata[:mime_type] = track.at("mimetype/text()").to_s if track.at('mimetype') metadata[:checksum] = track.at("checksum/text()").to_s if track.at('checksum') metadata[:duration] = track.at("duration/text()").to_s if track.at('duration') if track.at('audio') metadata[:audio_codec] = track.at("audio/encoder/@type").to_s metadata[:audio_channels] = track.at("audio/channels/text()").to_s metadata[:audio_bitrate] = track.at("audio/bitrate/text()").to_s end if track.at('video') metadata[:video_codec] = track.at("video/encoder/@type").to_s metadata[:video_bitrate] = track.at("video/bitrate/text()").to_s metadata[:video_framerate] = track.at("video/framerate/text()").to_s metadata[:width] = track.at("video/resolution/text()").to_s.split('x')[0] metadata[:height] = track.at("video/resolution/text()").to_s.split('x')[1] end metadata end
createMultipleFiles(input, workflow_id)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 223 def createMultipleFiles(input, workflow_id) #Create empty media package xml document mp = Rubyhorn.client.createMediaPackage #Next line associates workflow title to avalon via masterfile pid title = File.basename(input.values.first) dc = Nokogiri::XML('<dublincore xmlns="http://www.opencastproject.org/xsd/1.0/dublincore/" xmlns:dcterms="http://purl.org/dc/terms/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><dcterms:title>' + title + '</dcterms:title></dublincore>') mp = Rubyhorn.client.addDCCatalog({'mediaPackage' => mp.to_xml, 'dublinCore' => dc.to_xml, 'flavor' => 'dublincore/episode'}) #Add quality levels - repeated for each supplied file url input.each_pair do |quality, url| mp = Rubyhorn.client.addTrack({'mediaPackage' => mp.to_xml, 'url' => url, 'flavor' => DEFAULT_ARGS['flavor']}) #Rewrite track to include quality tag #Get the empty tags element under the newly added track tags = mp.xpath('//xmlns:track/xmlns:tags[not(node())]', 'xmlns' => 'http://mediapackage.opencastproject.org').first qualityTag = Nokogiri::XML::Node.new 'tag', mp qualityTag.content = quality tags.add_child qualityTag end #Finally ingest the media package begin Rubyhorn.client.start({"definitionId" => workflow_id, "mediapackage" => mp.to_xml}) rescue Rubyhorn::RestClient::Exceptions::HTTPBadRequest #make this two calls...one to get the workflow definition xml and then the second to submit it along with the mediapackage to start...due to unsolved issue with some MH installs begin workflow_definition_xml = Rubyhorn.client.definition_xml(workflow_id) Rubyhorn.client.start({"definition" => workflow_definition_xml, "mediapackage" => mp.to_xml}) rescue Rubyhorn::RestClient::Exceptions::HTTPNotFound raise StandardError.new("Unable to start workflow") end end end
fetch_workflow(id)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 48 def fetch_workflow(id) workflow_om = begin Rubyhorn.client.instance_xml(id) rescue Rubyhorn::RestClient::Exceptions::HTTPNotFound nil end workflow_om ||= begin Rubyhorn.client.get_stopped_workflow(id) rescue nil end get_workflow(workflow_om) end
get_media_package(workflow)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 163 def get_media_package(workflow) mp = workflow.xpath('//mediapackage') first_node = mp.first first_node['xmlns'] = 'http://mediapackage.opencastproject.org' mp end
get_workflow(workflow_om)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 64 def get_workflow(workflow_om) return nil if workflow_om.nil? if workflow_om.ng_xml.is_a? Nokogiri::XML::Document workflow_om.ng_xml.remove_namespaces!.root else workflow_om.ng_xml end end
purge_output(workflow, track_id)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 182 def purge_output(workflow, track_id) media_package = get_media_package(workflow) hls = workflow.xpath("//track[@id='#{track_id}']/tags/tag[text()='hls']").present? job_url = if hls Rubyhorn.client.delete_hls_track(media_package, track_id) else Rubyhorn.client.delete_track(media_package, track_id) end sleep(0.1) job_status = Nokogiri::XML(Rubyhorn.client.get(URI(job_url).path)).root.attribute("status").value() #FIXME have this return a boolean based upon result of operation case job_status when "FINISHED" workflow.at_xpath("//track[@id=\"#{track_id}\"]").remove when "FAILED" workflow.at_xpath('//errors').add_child("<error>Output not purged: #{mp.at_xpath("//*[@id=\"#{track_id}\"]/tags/tag[starts-with(text(),\"quality\")]/text()").to_s}</error>") end end
purge_outputs(workflow)
click to toggle source
# File lib/active_encode/engine_adapters/matterhorn_adapter.rb, line 170 def purge_outputs(workflow) #Delete hls tracks first since the next, more general xpath matches them as well workflow.xpath('//track[@type="presenter/delivery" and tags/tag[text()="streaming"] and tags/tag[text()="hls"]]/@id').map(&:to_s).each do |hls_track_id| purge_output(workflow, hls_track_id) rescue nil end workflow.xpath('//track[@type="presenter/delivery" and tags/tag[text()="streaming"]]/@id').map(&:to_s).each do |track_id| purge_output(workflow, track_id) rescue nil end workflow end