class Embulk::Output::Documentdb
Public Class Methods
transaction(config, schema, count) { |task| ... }
click to toggle source
# File lib/embulk/output/documentdb.rb, line 14 def self.transaction(config, schema, count, &control) # configuration code: task = { 'docdb_endpoint' => config.param('docdb_endpoint', :string), 'docdb_account_key' => config.param('docdb_account_key', :string), 'docdb_database' => config.param('docdb_database', :string), 'docdb_collection' => config.param('docdb_collection', :string), 'auto_create_database' => config.param('auto_create_database', :bool, :default => true), 'auto_create_collection' => config.param('auto_create_collection',:bool, :default => true), 'partitioned_collection' => config.param('partitioned_collection',:bool, :default => false), 'partition_key' => config.param('partition_key', :string, :default => nil), 'offer_throughput' => config.param('offer_throughput', :integer, :default => AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT), 'key_column' => config.param('key_column', :string), } Embulk.logger.info "transaction start" # param validation raise ConfigError, 'no docdb_endpoint' if task['docdb_endpoint'].empty? raise ConfigError, 'no docdb_account_key' if task['docdb_account_key'].empty? raise ConfigError, 'no docdb_database' if task['docdb_database'].empty? raise ConfigError, 'no docdb_collection' if task['docdb_collection'].empty? raise ConfigError, 'no key_column' if task['key_column'].empty? if task['partitioned_collection'] raise ConfigError, 'partition_key must be set in partitioned collection mode' if @partition_key.empty? if (task['auto_create_collection'] && task['offer_throughput'] < AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT) raise ConfigError, sprintf("offer_throughput must be more than and equals to %s", AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT) end end # resumable output: # resume(task, schema, count, &control) # non-resumable output: task_reports = yield(task) Embulk.logger.info "Documentdb output finished. Task reports = #{task_reports.to_json}" next_config_diff = {} return next_config_diff end
Public Instance Methods
abort()
click to toggle source
# File lib/embulk/output/documentdb.rb, line 155 def abort end
add(page)
click to toggle source
called for each page in each task
# File lib/embulk/output/documentdb.rb, line 114 def add(page) # output code: page.each do |record| hash = Hash[schema.names.zip(record)] @recordnum += 1 if !hash.key?(@task['key_column']) Embulk.logger.warn { "Skip Invalid Record: no key_column, data=>" + hash.to_json } next end unique_doc_id = "#{hash[@task['key_column']]}" if @task['key_column'] != 'id' hash.delete(@task['key_column']) end # force primary key to be both named "id" and "string" type hash['id'] = unique_doc_id begin if @task['partitioned_collection'] @client.create_document(@coll_resource, unique_doc_id, hash, @task['partition_key']) else @client.create_document(@coll_resource, unique_doc_id, hash) end @successnum += 1 rescue RestClient::ExceptionWithResponse => rcex exdict = JSON.parse(rcex.response) if exdict['code'] == 'Conflict' Embulk.logger.error { "Duplicate Error: doc id (#{unique_doc_id}) already exists, data=>" + hash.to_json } else Embulk.logger.error { "RestClient Error: '#{rcex.response}', data=>" + hash.to_json } end rescue => ex Embulk.logger.error { "UnknownError: '#{ex}', doc id=>#{unique_doc_id}, data=>" + hash.to_json } end end end
close()
click to toggle source
# File lib/embulk/output/documentdb.rb, line 110 def close end
commit()
click to toggle source
# File lib/embulk/output/documentdb.rb, line 158 def commit Embulk.logger.info "Documentdb output commit" elapsed_time = @finish_time - @start_time task_report = { "total_records" => @recordnum, "success" => @successnum, "skip_or_error" => (@recordnum - @successnum), "elapsed_time" => elapsed_time, } return task_report end
finish()
click to toggle source
# File lib/embulk/output/documentdb.rb, line 150 def finish Embulk.logger.info "Documentdb output finish" @finish_time = Time.now end
init()
click to toggle source
init is called in initialize(task, schema, index)
# File lib/embulk/output/documentdb.rb, line 64 def init Embulk.logger.info "Documentdb output init" @start_time = Time.now # initialization code: @recordnum = 0 @successnum = 0 begin @client = nil if task['partitioned_collection'] @client = AzureDocumentDB::PartitionedCollectionClient.new(task['docdb_account_key'],task['docdb_endpoint']) else @client = AzureDocumentDB::Client.new(task['docdb_account_key'],task['docdb_endpoint']) end # initial operations for database res = @client.find_databases_by_name(task['docdb_database']) if( res[:body]["_count"].to_i == 0 ) raise "No database (#{docdb_database})! Enable auto_create_database or create it by yourself" if !task['auto_create_database'] # create new database as it doesn't exists @client.create_database(task['docdb_database']) end # initial operations for collection database_resource = @client.get_database_resource(task['docdb_database']) res = @client.find_collections_by_name(database_resource, task['docdb_collection']) if( res[:body]["_count"].to_i == 0 ) raise "No collection (#{docdb_collection})! Enable auto_create_collection or create it by yourself" if !task['auto_create_collection'] # create new collection as it doesn't exists if task['partitioned_collection'] partition_key_paths = ["/#{task['partition_key']}"] @client.create_collection(database_resource, task['docdb_collection'], partition_key_paths, task['offer_throughput']) else @client.create_collection(database_resource, task['docdb_collection']) end end @coll_resource = @client.get_collection_resource(database_resource, task['docdb_collection']) rescue Exception =>ex Embulk.logger.error { "Error: init: '#{ex}'" } exit! end end