class Tapsoob::CLI::DataStream

Public Instance Methods

pull(database_url, dump_path = nil) click to toggle source
# File lib/tapsoob/cli/data_stream.rb, line 20
def pull(database_url, dump_path = nil)
  op = Tapsoob::Operation.factory(:pull, database_url, dump_path, parse_opts(options))
  op.pull_data
end
push(database_url, dump_path = nil) click to toggle source
# File lib/tapsoob/cli/data_stream.rb, line 35
def push(database_url, dump_path = nil)
  # instantiate stuff
  data = []
  opts = parse_opts(options)

  # read data from dump_path or from STDIN
  if dump_path && Dir.exists?(dump_path)
    files = Dir[Pathname.new(dump_path).join("*.json")]
    files.each { |file| data << JSON.parse(File.read(file), symbolize_names: true) }
  else
    STDIN.each_line { |line| data << JSON.parse(line, symbolize_names: true) }
  end

  # import data
  data.each do |table|
    stream = Tapsoob::DataStream.factory(db(database_url, opts), {
      table_name: table[:table_name],
      chunksize: opts[:default_chunksize]
    }, { :"discard-identity" => opts[:"discard-identity"] || false, :purge => opts[:purge] || false, :debug => opts[:debug] })

    begin
      stream.import_rows(table)
    rescue Exception => e
      stream.log.debug e.message
    end
  end
end

Private Instance Methods

db(database_url, opts = {}) click to toggle source
# File lib/tapsoob/cli/data_stream.rb, line 96
def db(database_url, opts = {})
  @db ||= Sequel.connect(database_url)
  @db.loggers << Tapsoob.log if opts[:debug]

  # Set parameters
  if @db.uri =~ /oracle/i
    @db << "ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'"
    @db << "ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS:FF6'"
  end

  @db
end
parse_opts(options) click to toggle source
# File lib/tapsoob/cli/data_stream.rb, line 64
def parse_opts(options)
  # Default options
  opts = {
    progress: options[:progress],
    debug: options[:debug]
  }

  # Push only options
  opts[:purge] = options[:purge] if options.key?(:purge)
  opts[:"skip-duplicates"] = options[:"skip-duplicates"] if options.key?(:"skip-duplicates")
  opts[:"discard-identity"] = options[:"discard-identity"] if options.key?(:"discard-identity")

  # Default chunksize
  if options[:chunksize]
    opts[:default_chunksize] = (options[:chunksize] < 10 ? 10 : options[:chunksize])
  end

  # Regex filter
  opts[:table_filter] = options[:filter] if options[:filter]

  # Table filter
  if options[:tables]
    r_tables = options[:tables].collect { |t| "^#{t}" }.join("|")
    opts[:table_filter] = "#{r_tables}"
  end

  # Exclude tables
  opts[:exclude_tables] = options[:"exclude-tables"] if options[:"exclude-tables"]

  opts
end