class Embulk::Output::Wendelin
Public Class Methods
transaction(config, schema, count) { |task| ... }
click to toggle source
# File lib/embulk/output/wendelin.rb, line 10 def self.transaction(config, schema, count, &control) # configuration code: task = { "tag" => config.param("tag", :string, default: nil), # where Wendelin's Input Stream Tool is located, # ex http://example.com/erp5/portal_ingestion_policies/example_ingestion "streamtool_uri" => config.param("streamtool_uri", :string), # credentials to authenticate this fluentd to wendelin # by default credentials are not used # TODO user/password -> certificate "user" => config.param("user", :string, defualt: nil), "password" => config.param("password", :string, default: nil), "path_prefix" => config.param("path_prefix", :string, :default => nil), } # resumable output: # resume(task, schema, count, &control) #Embulk.logger.info { config.to_s } #Embulk.logger.info { schema.to_s } #Embulk.logger.info { count.to_s } # non-resumable output: task_reports = yield(task) next_config_diff = {} return next_config_diff end
Public Instance Methods
abort()
click to toggle source
# File lib/embulk/output/wendelin.rb, line 90 def abort end
add(page)
click to toggle source
# File lib/embulk/output/wendelin.rb, line 66 def add(page) # output code: #page.each do |record| #hash = Hash[schema.names.zip(record)] #end #Embulk.logger.info { page.to_s } page.each do |record| #Embulk.logger.info { record.to_s } if not @tag.nil? tag = @tag end if not record[1].nil? tag = tag + "." + record[1].gsub(File::ALT_SEPARATOR || File::SEPARATOR, '.')[1..-1] end Embulk.logger.info{ tag } @wendelin.ingest(tag, Base64.decode64(record[0])) end end
close()
click to toggle source
# File lib/embulk/output/wendelin.rb, line 63 def close end
commit()
click to toggle source
# File lib/embulk/output/wendelin.rb, line 93 def commit task_report = {} return task_report end
finish()
click to toggle source
# File lib/embulk/output/wendelin.rb, line 87 def finish end
init()
click to toggle source
def self.resume(task, schema, count, &control)
task_reports = yield(task) next_config_diff = {} return next_config_diff
end
# File lib/embulk/output/wendelin.rb, line 48 def init # initialization code: credentials = {} #Embulk.logger.info { "Test" } #Embulk.logger.info { schema.to_s } #Embulk.logger.info "Test" @tag = task["tag"] streamtool_uri = task["streamtool_uri"] unless task["user"].nil? credentials["user"] = task["user"] credentials["password"] = task["password"] end @wendelin = WendelinClient.new(streamtool_uri, credentials, Embulk.logger) end