class PipeFitter::DataPipelineClient
Public Class Methods
new(options)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 10 def initialize(options) @options = options.map { |k, v| [k.to_sym, v] }.to_h end
Public Instance Methods
activate(pipeline_id, parameter_file, start_timestamp)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 49 def activate(pipeline_id, parameter_file, start_timestamp) p = parameter_file ? load_pipeline(parameter_file) : Pipeline.new exec(:activate_pipeline, p.activate_opts(pipeline_id, start_timestamp)).to_h end
create(pipeline)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 44 def create(pipeline) res = exec(:create_pipeline, pipeline.create_opts) [res.pipeline_id, put_definition(res.pipeline_id, pipeline)] end
definition(pipeline_id)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 33 def definition(pipeline_id) res = exec(:get_pipeline_definition, pipeline_id: pipeline_id) desc = description(pipeline_id) Pipeline.create(res.to_h, desc.to_h) end
diff(pipeline_id, definition_file, format = :color)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 19 def diff(pipeline_id, definition_file, format = :color) p = load_pipeline(definition_file) [ definition(pipeline_id).diff(p, format.to_sym), diff_deploy_files(definition_file, format.to_sym), ].compact.reject(&:empty?).join("\n") end
diff_deploy_files(definition_file, format = :color)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 64 def diff_deploy_files(definition_file, format = :color) p = load_pipeline(definition_file) p.deploy_files.map do |df| c = S3diff::Comparator.new(df[:dst], df[:src], sdk_opts) c.diff.to_s(format.to_sym) unless c.same? end.compact end
find_registered(definition_file)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 54 def find_registered(definition_file) p = load_pipeline(definition_file) pls = list_pipelines.select { |l| l.name == p.pipeline_description.name } res = pls.find do |pl| d = Pipeline::PipelineDescription.create(description(pl.id)) d.unique_id == p.pipeline_description.unique_id end res end
put_definition(pipeline_id, pipeline)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 39 def put_definition(pipeline_id, pipeline) sync_tags(pipeline_id, pipeline) exec(:put_pipeline_definition, pipeline.put_definition_opts(pipeline_id)).to_h end
register(definition_file)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 14 def register(definition_file) p = load_pipeline(definition_file) create(p) end
update(pipeline_id, definition_file)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 27 def update(pipeline_id, definition_file) upload_deploy_files(definition_file) p = load_pipeline(definition_file) put_definition(pipeline_id, p) end
upload_deploy_files(definition_file)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 72 def upload_deploy_files(definition_file) p = load_pipeline(definition_file) p.deploy_files.each do |df| put_object(df[:src], df[:dst]) end end
Private Instance Methods
client()
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 124 def client @client ||= Aws::DataPipeline::Client.new(sdk_opts) end
description(pipeline_id)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 85 def description(pipeline_id) desc = exec(:describe_pipelines, pipeline_ids: [pipeline_id]).pipeline_description_list.first raise NoSuchPipelineError, pipeline_id if desc.nil? desc end
exec(method, *args)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 108 def exec(method, *args) client.send(method, *args) rescue Aws::DataPipeline::Errors::PipelineNotFoundException, Aws::DataPipeline::Errors::PipelineDeletedException => e raise NoSuchPipelineError, args.unshift(e.class) end
list_pipelines()
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 91 def list_pipelines res = exec(:list_pipelines) pls = res.pipeline_id_list while res.has_more_results res = exec(:list_pipelines, marker: res.marker) pls.concat(res.pipeline_id_list) end pls end
load_pipeline(definition_file)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 81 def load_pipeline(definition_file) Pipeline.load_yaml(definition_file) end
put_object(src, dst)
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 114 def put_object(src, dst) u = URI.parse(dst) s3client.put_object( body: File.read(src), bucket: u.host, key: u.path.sub(%r{^/}, "") ) puts "put #{src} to #{dst}" end
s3client()
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 128 def s3client @s3client ||= Aws::S3::Client.new(sdk_opts) end
sdk_opts()
click to toggle source
# File lib/pipe_fitter/data_pipeline_client.rb, line 132 def sdk_opts keys = %i(region profile).freeze @options.select { |k, _| keys.include?(k) } end