class Object

Public Instance Methods

csv_to_db(file_name, table_name) click to toggle source
# File lib/fns/csv_to_db.rb, line 3
def csv_to_db(file_name, table_name)
  db_env_name = 'DB'
  db = DataPipe2.get_fluid_db(db_env_name)
  db.execute("TRUNCATE TABLE #{table_name}", [])

  csv = CSV.read(file_name)
  columns = csv.shift
  sql = "INSERT INTO #{table_name} ( #{columns.join(',')} )
          VALUES (#{Array.new(columns.length, '?').join(',')})"
  csv.each do |row|
    db.execute(sql, row)
  end
  DataPipe2.log "#{table_name}: #{csv.length}", true
end
db_to_csv(sql, path, name) click to toggle source
# File lib/fns/db_to_csv.rb, line 3
def db_to_csv(sql, path, name)
  db_env_name = 'DB'
  db = DataPipe2.get_fluid_db(db_env_name)

  file_path = "#{path}/#{name}"
  File.delete(file_path) if File.exist?(file_path)

  rst = db.queryForResultset(sql, [])

  CSV.open(file_path, 'w') do |csv|
    csv << rst[0].keys
    rst.each { |r| csv << r.values }
  end
end
db_to_db(select_sql, table_name, columns) click to toggle source
# File lib/fns/db_to_db.rb, line 2
def db_to_db(select_sql, table_name, columns)
  s = FluidDb::Db(ENV['SOURCE_DB']) if s.nil? # source
  d = FluidDb::Db(ENV['DEST_DB']) if d.nil? # destination

  insert_cmd = %{
    INSERT INTO #{table_name} (#{columns})
    VALUES (#{Array.new(columns.split(',').length, '?').join(',')})
  }

  rst = s.queryForResultset(select_sql)
  rst.each do |row|
    d.execute(insert_cmd, row.values)
  end

  DataPipe2.log "#{table_name}: #{rst.length}"
end
db_to_json(sql, path, name) click to toggle source
# File lib/fns/db_to_json.rb, line 4
def db_to_json(sql, path, name)
  # Dir.mkdir(path) unless Dir.exists?(path)
  db_env_name = 'DB'
  db = DataPipe2.get_fluid_db(db_env_name)

  file_path = "#{path}/#{name}"
  File.delete(file_path) if File.exist?(file_path)

  rst = db.queryForResultset(sql, [])
  File.write(file_path, rst.to_json)
end
db_to_pgsql(select_sql, table_name, columns) click to toggle source
# File lib/fns/db_to_pgsql.rb, line 2
def db_to_pgsql(select_sql, table_name, columns)
  s = FluidDb::Db(ENV['SOURCE_DB']) # source
  d = FluidDb::Db(ENV['PGSQL']) # destination

  d.connection.exec("TRUNCATE TABLE #{table_name}")
  copy_cmd = %{
    COPY #{table_name} (#{columns})
    FROM STDIN
    WITH DELIMITER AS '|'
    CSV;
  }
  d.connection.exec(copy_cmd)

  rst = s.queryForResultset(select_sql)
  count = 0
  rst.each do |r|
    count += 1
    print '.' if count % 100_000 == 0
    d.connection.put_copy_data "#{r.values.join('|')}\n"
  end
  puts ""
  d.connection.put_copy_end

  DataPipe2.log "#{table_name}: #{count}"
end
mssql_to_pgsql(select_sql, table_name, columns) click to toggle source
# File lib/fns/mssql_to_pgsql.rb, line 2
def mssql_to_pgsql(select_sql, table_name, columns)
  s = FluidDb::Db(ENV['MSSQL']) # source
  d = FluidDb::Db(ENV['PGSQL']) # destination

  d.connection.exec("TRUNCATE TABLE #{table_name}")
  copy_cmd = %{
    COPY #{table_name} (#{columns})
    FROM STDIN
    WITH DELIMITER AS '|'
    CSV;
  }
  d.connection.exec(copy_cmd)

  results = s.connection.execute(select_sql)

  count = 0
  results.each(as: :array, cache_rows: false) do |r|
    count += 1
    print '.' if count % 100_000 == 0
    d.connection.put_copy_data "#{r.join('|')}\n"
  end
  puts ""
  d.connection.put_copy_end

  DataPipe2.log "#{table_name}: #{count}"
end
scp_to_local(scp_uri_string, remote_done_path, local_path, local_working_path = nil) click to toggle source
# File lib/fns/scp_to_local.rb, line 5
def scp_to_local(scp_uri_string,
                 remote_done_path,
                 local_path,
                 local_working_path = nil)
  scp_uri = URI.parse(scp_uri_string)
  remote_ready_path = "#{scp_uri.path}"

  local_working_path = "/tmp/#{local_path}" if local_working_path.nil?
  FileUtils.mkdir_p local_working_path
  # Check we can write to local path

  Net::SFTP.start(scp_uri.host, scp_uri.user) do |sftp|
    # Check we can read and write from remote_ready_path
    # Check we can write to remote_done_path
    # puts "remote_ready_path: #{sftp.lstat!('remote_ready_path').permissions}"
    # puts "remote_done_path: #{sftp.lstat!('remote_done_path').permissions}"
    sftp.dir.foreach(remote_ready_path) do |entry|
      if entry.file?
        puts entry.name
        sftp.download!(
          "#{remote_ready_path}/#{entry.name}",
          "#{local_working_path}/#{entry.name}")

        FileUtils.mv("#{local_working_path}/#{entry.name}", local_path)

        sftp.rename!("#{remote_ready_path}/#{entry.name}",
                     "#{remote_done_path}/#{entry.name}")
      end
    end
  end

  FileUtils.rm_rf local_working_path
end
smb_to_local(smb_uri_string, local_folder) click to toggle source
# File lib/fns/smb_to_local.rb, line 5
def smb_to_local(smb_uri_string, local_folder)
  # encode the string
  uri = URI.parse(URI.escape(smb_uri_string))
  domain = nil
  unless uri.query.nil?
    params = CGI.parse(uri.query)
    domain = params['domain'][0]
  end
  p "#{domain}, #{uri.host}, #{uri.user}, #{uri.password}"
  client = Sambal::Client.new(domain: domain, host: uri.host,
                              share: 'Data', user: uri.user, password: uri.password)
  client.ls(remote_dirpath).each do |remote_filepath|
    local_filepath = local_folder
    client.get(remote_filepath, local_filepath) # downloads file from server
    client.del(remote_filepath) # deletes files from server
  end
  client.close # closes connection
end