class Tapsoob::CLI::DataStream
Public Instance Methods
pull(database_url, dump_path = nil)
click to toggle source
# File lib/tapsoob/cli/data_stream.rb, line 20 def pull(database_url, dump_path = nil) op = Tapsoob::Operation.factory(:pull, database_url, dump_path, parse_opts(options)) op.pull_data end
push(database_url, dump_path = nil)
click to toggle source
# File lib/tapsoob/cli/data_stream.rb, line 35 def push(database_url, dump_path = nil) # instantiate stuff data = [] opts = parse_opts(options) # read data from dump_path or from STDIN if dump_path && Dir.exists?(dump_path) files = Dir[Pathname.new(dump_path).join("*.json")] files.each { |file| data << JSON.parse(File.read(file), symbolize_names: true) } else STDIN.each_line { |line| data << JSON.parse(line, symbolize_names: true) } end # import data data.each do |table| stream = Tapsoob::DataStream.factory(db(database_url, opts), { table_name: table[:table_name], chunksize: opts[:default_chunksize] }, { :"discard-identity" => opts[:"discard-identity"] || false, :purge => opts[:purge] || false, :debug => opts[:debug] }) begin stream.import_rows(table) rescue Exception => e stream.log.debug e.message end end end
Private Instance Methods
db(database_url, opts = {})
click to toggle source
# File lib/tapsoob/cli/data_stream.rb, line 96 def db(database_url, opts = {}) @db ||= Sequel.connect(database_url) @db.loggers << Tapsoob.log if opts[:debug] # Set parameters if @db.uri =~ /oracle/i @db << "ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'" @db << "ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS:FF6'" end @db end
parse_opts(options)
click to toggle source
# File lib/tapsoob/cli/data_stream.rb, line 64 def parse_opts(options) # Default options opts = { progress: options[:progress], debug: options[:debug] } # Push only options opts[:purge] = options[:purge] if options.key?(:purge) opts[:"skip-duplicates"] = options[:"skip-duplicates"] if options.key?(:"skip-duplicates") opts[:"discard-identity"] = options[:"discard-identity"] if options.key?(:"discard-identity") # Default chunksize if options[:chunksize] opts[:default_chunksize] = (options[:chunksize] < 10 ? 10 : options[:chunksize]) end # Regex filter opts[:table_filter] = options[:filter] if options[:filter] # Table filter if options[:tables] r_tables = options[:tables].collect { |t| "^#{t}" }.join("|") opts[:table_filter] = "#{r_tables}" end # Exclude tables opts[:exclude_tables] = options[:"exclude-tables"] if options[:"exclude-tables"] opts end