class Dag::Job
Attributes
access_key_id[R]
cluster[R]
cluster_rebooted[R]
dsl[R]
id[R]
input_format[R]
input_object_keys[R]
job_id[R]
label[R]
output_database[R]
output_format[R]
output_resource_path[R]
output_table[R]
process_engine[R]
progress[R]
query[R]
schema[R]
stage[R]
start_at[R]
status[R]
type[R]
Public Class Methods
new(api, job_info)
click to toggle source
Calls superclass method
Dag::Model::new
# File lib/dag/client/model/job.rb, line 3 def initialize(api, job_info) super(api) update_parameters(job_info) end
Public Instance Methods
cluster_rebooted?()
click to toggle source
# File lib/dag/client/model/job.rb, line 28 def cluster_rebooted? !!@cluster_rebooted end
download_urls(time_limit = 30)
click to toggle source
# File lib/dag/client/model/job.rb, line 44 def download_urls(time_limit = 30) raise Dag::Client::StatusInvalid.new("job status is not finished") unless finished? expire_at = time_limit.minutes.since.to_i object_uri = URI.parse(@output_resource_path) bucket = object_uri.host object_path = object_uri.path[1..-1] object_path += '/' unless object_path.end_with? '/' bucket_objects = @api.objects(bucket, prefix: object_path).objects bucket_objects.map do |object| path = if @api.force_path_style? "/#{bucket}/#{object}" else "/#{object}" end parameters = { "Expires" => expire_at, "IIJGIOAccessKeyId" => @api.apikey, "Signature" => @api.download_signature(expire_at, bucket, path) } uri = URI.parse(@api.storage_api) url = if @api.force_path_style? "http://#{uri.host}" else "http://#{bucket}.#{uri.host}" end url += ":#{uri.port}" unless uri.port == 80 || uri.port == 443 File.join(url, "#{path}?#{parameters.to_param}") end end
finished?()
click to toggle source
# File lib/dag/client/model/job.rb, line 12 def finished? @status == 'finished' end
hive?()
click to toggle source
# File lib/dag/client/model/job.rb, line 24 def hive? @type == 'select' end
kill()
click to toggle source
# File lib/dag/client/model/job.rb, line 37 def kill validate_cancel_condition @api.query_cancel(@id) end
Also aliased as: cancel
log()
click to toggle source
# File lib/dag/client/model/job.rb, line 77 def log validate_log_condition log_info = @api.query_log(@id) log_info ? log_info['log'] : '' end
reload()
click to toggle source
# File lib/dag/client/model/job.rb, line 32 def reload job_info = @api.query_info(@id) update_parameters(job_info) end
running?()
click to toggle source
# File lib/dag/client/model/job.rb, line 16 def running? @status == 'running' end
split?()
click to toggle source
# File lib/dag/client/model/job.rb, line 20 def split? @type == 'split' end
validate_cancel_condition()
click to toggle source
# File lib/dag/client/model/job.rb, line 93 def validate_cancel_condition unless running? raise Dag::Client::StatusInvalid.new("job status is not running") end end
validate_log_condition()
click to toggle source
# File lib/dag/client/model/job.rb, line 83 def validate_log_condition if split? raise Dag::Client::JobTypeInvalid.new("job type is not select") end if cluster_rebooted? raise Dag::Client::ClusterRebooted.new("cluster is rebooted") end end
Private Instance Methods
update_parameters(job)
click to toggle source
# File lib/dag/client/model/job.rb, line 101 def update_parameters(job) @id = job['id'] @status = job['status'] @process_engine = job['processEngine'] @dsl = job['dsl'] @cluster = job['clusterName'] @cluster_rebooted = job['clusterRebooted'] @start_at = Time.parse(job['startTime']) if job['startTime'] @access_key_id = job['accessKeyId'] @query = job['query'] @output_format = job['outputFormat'] @output_resource_path = job['outputResourcePath'] @type = job['type'] @label = job['label'] @stage = job['stage'] @progress = job['progress'] @job_id = job['jobId'] @schema = job['schema'] @input_object_keys = job['inputObjectKeys'] @input_format = job['inputFormat'] @output_database = job['outputDatabase'] @output_table = job['outputTable'] end