class Embulk::Input::Buildkite

Constants

MAX_RETRY

Public Class Methods

resume(task, columns, count) { |task, columns, count| ... } click to toggle source
# File lib/embulk/input/buildkite.rb, line 47
def self.resume(task, columns, count, &control)
  task_reports = yield(task, columns, count)

  next_config_diff = {}
  return next_config_diff
end
transaction(config, &control) click to toggle source
# File lib/embulk/input/buildkite.rb, line 24
def self.transaction(config, &control)
  # configuration code:
  task = {
    "org_slug" => config.param("org_slug", :string),
    "pipeline_slug" => config.param("pipeline_slug", :string),
    "build_nums" => config.param("build_nums", :array),
    "token" => config.param("token", :string),
    "artifact_download_concurrency" => config.param("artifact_download_concurrency", :integer, default: 10),
  }

  columns = [
    Column.new(0, "id", :string),
    Column.new(1, "data", :string),
    Column.new(2, "log", :string),
    Column.new(3, "artifacts", :string),
    Column.new(4, "created_at", :timestamp),
    Column.new(5, "build_number", :long),
    Column.new(6, "build_data", :string),
  ]

  resume(task, columns, 1, &control)
end

Public Instance Methods

init() click to toggle source

TODO def self.guess(config)

sample_records = [
  {"example"=>"a", "column"=>1, "value"=>0.1},
  {"example"=>"a", "column"=>2, "value"=>0.2},
]
columns = Guess::SchemaGuess.from_hash_records(sample_records)
return {"columns" => columns}

end

# File lib/embulk/input/buildkite.rb, line 64
def init
  # initialization code:
end
run() click to toggle source
# File lib/embulk/input/buildkite.rb, line 68
def run
  task['build_nums'].each do |build_num|
    logger.info("Start build_num:[#{build_num}]")

    build = with_retry { client.fetch_build(number: build_num) }
    build[:jobs].each do |job|
      logger.info("Start job_id:[#{job[:id]}]")
      log = with_retry { client.fetch_log(build_number: job[:build_number], job_id: job[:id]) }
      artifacts = with_retry { client.fetch_artifacts(build_number: job[:build_number], job_id: job[:id]) }

      queue = Queue.new
      artifacts.each {|a| queue.push(a) }
      workers = Array.new(task['artifact_download_concurrency']) do
        Thread.new do
          begin
            while artifact = queue.pop(true)
              artifact[:body] = with_retry {
                client.fetch_artifact(build_number: job[:build_number], job_id: job[:id], artifact_id: artifact[:id]).encode("utf-8", invalid: :replace, undef: :replace)
              }
            end
          rescue ThreadError
          end
        end
      end
      workers.each(&:join)

      page_builder.add([
        job[:id],
        job.to_json,
        log.to_json,
        artifacts.to_json,
        Time.parse(job[:created_at]),
        job[:build_number],
        build.to_json,
      ])

      page_builder.flush
    end
  end

  page_builder.finish

  task_report = {}
  return task_report
end

Private Instance Methods

client() click to toggle source
# File lib/embulk/input/buildkite.rb, line 116
def client
  @client ||= UnofficialBuildkiteClient.new(access_token: task["token"], org_slug: task["org_slug"], pipeline_slug: task["pipeline_slug"], logger: ::Logger.new(nil))
end
logger() click to toggle source
# File lib/embulk/input/buildkite.rb, line 120
def logger
  @logger ||= Logger.new(Embulk.logger)
end
with_retry() { || ... } click to toggle source
# File lib/embulk/input/buildkite.rb, line 124
def with_retry(&block)
  retries = 0
  begin
    yield
  rescue => e
    sleep retries

    if retries < MAX_RETRY
      retries += 1
      logger.warn("retry ##{retries}, #{e.message}")
      retry
    else
      logger.error("retry exhausted ##{retries}, #{e.message}")
      raise e
    end
  end
end