class Object
Public Instance Methods
csv_to_db(file_name, table_name)
click to toggle source
# File lib/fns/csv_to_db.rb, line 3 def csv_to_db(file_name, table_name) db_env_name = 'DB' db = DataPipe2.get_fluid_db(db_env_name) db.execute("TRUNCATE TABLE #{table_name}", []) csv = CSV.read(file_name) columns = csv.shift sql = "INSERT INTO #{table_name} ( #{columns.join(',')} ) VALUES (#{Array.new(columns.length, '?').join(',')})" csv.each do |row| db.execute(sql, row) end DataPipe2.log "#{table_name}: #{csv.length}", true end
db_to_csv(sql, path, name)
click to toggle source
# File lib/fns/db_to_csv.rb, line 3 def db_to_csv(sql, path, name) db_env_name = 'DB' db = DataPipe2.get_fluid_db(db_env_name) file_path = "#{path}/#{name}" File.delete(file_path) if File.exist?(file_path) rst = db.queryForResultset(sql, []) CSV.open(file_path, 'w') do |csv| csv << rst[0].keys rst.each { |r| csv << r.values } end end
db_to_db(select_sql, table_name, columns)
click to toggle source
# File lib/fns/db_to_db.rb, line 2 def db_to_db(select_sql, table_name, columns) s = FluidDb::Db(ENV['SOURCE_DB']) if s.nil? # source d = FluidDb::Db(ENV['DEST_DB']) if d.nil? # destination insert_cmd = %{ INSERT INTO #{table_name} (#{columns}) VALUES (#{Array.new(columns.split(',').length, '?').join(',')}) } rst = s.queryForResultset(select_sql) rst.each do |row| d.execute(insert_cmd, row.values) end DataPipe2.log "#{table_name}: #{rst.length}" end
db_to_json(sql, path, name)
click to toggle source
# File lib/fns/db_to_json.rb, line 4 def db_to_json(sql, path, name) # Dir.mkdir(path) unless Dir.exists?(path) db_env_name = 'DB' db = DataPipe2.get_fluid_db(db_env_name) file_path = "#{path}/#{name}" File.delete(file_path) if File.exist?(file_path) rst = db.queryForResultset(sql, []) File.write(file_path, rst.to_json) end
db_to_pgsql(select_sql, table_name, columns)
click to toggle source
# File lib/fns/db_to_pgsql.rb, line 2 def db_to_pgsql(select_sql, table_name, columns) s = FluidDb::Db(ENV['SOURCE_DB']) # source d = FluidDb::Db(ENV['PGSQL']) # destination d.connection.exec("TRUNCATE TABLE #{table_name}") copy_cmd = %{ COPY #{table_name} (#{columns}) FROM STDIN WITH DELIMITER AS '|' CSV; } d.connection.exec(copy_cmd) rst = s.queryForResultset(select_sql) count = 0 rst.each do |r| count += 1 print '.' if count % 100_000 == 0 d.connection.put_copy_data "#{r.values.join('|')}\n" end puts "" d.connection.put_copy_end DataPipe2.log "#{table_name}: #{count}" end
mssql_to_pgsql(select_sql, table_name, columns)
click to toggle source
# File lib/fns/mssql_to_pgsql.rb, line 2 def mssql_to_pgsql(select_sql, table_name, columns) s = FluidDb::Db(ENV['MSSQL']) # source d = FluidDb::Db(ENV['PGSQL']) # destination d.connection.exec("TRUNCATE TABLE #{table_name}") copy_cmd = %{ COPY #{table_name} (#{columns}) FROM STDIN WITH DELIMITER AS '|' CSV; } d.connection.exec(copy_cmd) results = s.connection.execute(select_sql) count = 0 results.each(as: :array, cache_rows: false) do |r| count += 1 print '.' if count % 100_000 == 0 d.connection.put_copy_data "#{r.join('|')}\n" end puts "" d.connection.put_copy_end DataPipe2.log "#{table_name}: #{count}" end
scp_to_local(scp_uri_string, remote_done_path, local_path, local_working_path = nil)
click to toggle source
# File lib/fns/scp_to_local.rb, line 5 def scp_to_local(scp_uri_string, remote_done_path, local_path, local_working_path = nil) scp_uri = URI.parse(scp_uri_string) remote_ready_path = "#{scp_uri.path}" local_working_path = "/tmp/#{local_path}" if local_working_path.nil? FileUtils.mkdir_p local_working_path # Check we can write to local path Net::SFTP.start(scp_uri.host, scp_uri.user) do |sftp| # Check we can read and write from remote_ready_path # Check we can write to remote_done_path # puts "remote_ready_path: #{sftp.lstat!('remote_ready_path').permissions}" # puts "remote_done_path: #{sftp.lstat!('remote_done_path').permissions}" sftp.dir.foreach(remote_ready_path) do |entry| if entry.file? puts entry.name sftp.download!( "#{remote_ready_path}/#{entry.name}", "#{local_working_path}/#{entry.name}") FileUtils.mv("#{local_working_path}/#{entry.name}", local_path) sftp.rename!("#{remote_ready_path}/#{entry.name}", "#{remote_done_path}/#{entry.name}") end end end FileUtils.rm_rf local_working_path end
smb_to_local(smb_uri_string, local_folder)
click to toggle source
# File lib/fns/smb_to_local.rb, line 5 def smb_to_local(smb_uri_string, local_folder) # encode the string uri = URI.parse(URI.escape(smb_uri_string)) domain = nil unless uri.query.nil? params = CGI.parse(uri.query) domain = params['domain'][0] end p "#{domain}, #{uri.host}, #{uri.user}, #{uri.password}" client = Sambal::Client.new(domain: domain, host: uri.host, share: 'Data', user: uri.user, password: uri.password) client.ls(remote_dirpath).each do |remote_filepath| local_filepath = local_folder client.get(remote_filepath, local_filepath) # downloads file from server client.del(remote_filepath) # deletes files from server end client.close # closes connection end