class RedshiftConnector::Connector
Public Class Methods
new(exporter:, importer:, logger:)
click to toggle source
# File lib/redshift_connector/connector.rb, line 153 def initialize(exporter:, importer:, logger:) @exporter = exporter @importer = importer @logger = logger @bundle = nil end
transport_all( strategy: 'rename', schema:, table: nil, src_table: table, dest_table: table, columns:, bucket: nil, txn_id: nil, filter:, logger: RedshiftConnector.logger, quiet: false )
click to toggle source
# File lib/redshift_connector/connector.rb, line 115 def Connector.transport_all( strategy: 'rename', schema:, table: nil, src_table: table, dest_table: table, columns:, bucket: nil, txn_id: nil, filter:, logger: RedshiftConnector.logger, quiet: false ) logger = NullLogger.new if quiet bundle_params = DataFileBundleParams.new( bucket: bucket, schema: schema, table: src_table, txn_id: txn_id, filter: filter, logger: logger ) exporter = Exporter.for_table( bundle_params: bundle_params, schema: schema, table: src_table, columns: columns, logger: logger ) importer = Importer.for_rebuild( strategy: strategy, table: dest_table, columns: columns, logger: logger ) new(exporter: exporter, importer: importer, logger: logger) end
transport_all_from_s3( strategy: 'rename', table:, columns:, bucket: nil, prefix:, format:, filter: nil, logger: RedshiftConnector.logger, quiet: false )
click to toggle source
# File lib/redshift_connector/connector.rb, line 86 def Connector.transport_all_from_s3( strategy: 'rename', table:, columns:, bucket: nil, prefix:, format:, filter: nil, logger: RedshiftConnector.logger, quiet: false ) logger = NullLogger.new if quiet bundle = S3DataFileBundle.for_prefix( bucket: (bucket ? S3Bucket.get(bucket) : S3Bucket.default), prefix: prefix, format: format, filter: filter, logger: logger ) exporter = ImmediateExporter.new(bundle: bundle, logger: logger) importer = Importer.for_rebuild( strategy: strategy, table: table, columns: columns, logger: logger ) new(exporter: exporter, importer: importer, logger: logger) end
transport_delta( schema:, table: nil, src_table: table, dest_table: table, condition:, columns:, delete_cond: nil, upsert_columns: nil, bucket: nil, txn_id: nil, filter:, logger: RedshiftConnector.logger, quiet: false )
click to toggle source
# File lib/redshift_connector/connector.rb, line 41 def Connector.transport_delta( schema:, table: nil, src_table: table, dest_table: table, condition:, columns:, delete_cond: nil, upsert_columns: nil, bucket: nil, txn_id: nil, filter:, logger: RedshiftConnector.logger, quiet: false ) unless src_table and dest_table raise ArgumentError, "missing :table, :src_table or :dest_table" end logger = NullLogger.new if quiet bundle_params = DataFileBundleParams.new( bucket: bucket, schema: schema, table: src_table, txn_id: txn_id, filter: filter, logger: logger ) exporter = Exporter.for_table_delta( bundle_params: bundle_params, schema: schema, table: src_table, columns: columns, condition: condition, logger: logger ) importer = Importer.for_delta_upsert( table: dest_table, columns: columns, delete_cond: delete_cond, upsert_columns: upsert_columns, logger: logger ) new(exporter: exporter, importer: importer, logger: logger) end
transport_delta_from_s3( bucket: nil, prefix:, format:, filter: nil, table:, columns:, delete_cond: nil, upsert_columns: nil, logger: RedshiftConnector.logger, quiet: false )
click to toggle source
# File lib/redshift_connector/connector.rb, line 10 def Connector.transport_delta_from_s3( bucket: nil, prefix:, format:, filter: nil, table:, columns:, delete_cond: nil, upsert_columns: nil, logger: RedshiftConnector.logger, quiet: false ) logger = NullLogger.new if quiet bundle = S3DataFileBundle.for_prefix( bucket: (bucket ? S3Bucket.get(bucket) : S3Bucket.default), prefix: prefix, format: format, filter: filter, logger: logger ) exporter = ImmediateExporter.new(bundle: bundle, logger: logger) importer = Importer.for_delta_upsert( table: table, columns: columns, delete_cond: delete_cond, upsert_columns: upsert_columns, logger: logger ) new(exporter: exporter, importer: importer, logger: logger) end
Public Instance Methods
execute()
click to toggle source
# File lib/redshift_connector/connector.rb, line 168 def execute export if export_enabled? import if import_enabled? end
export()
click to toggle source
# File lib/redshift_connector/connector.rb, line 173 def export @logger.info "==== export task ==================================================" @bundle = @exporter.execute end
export_enabled?()
click to toggle source
# File lib/redshift_connector/connector.rb, line 160 def export_enabled? not ENV['IMPORT_ONLY'] end
import()
click to toggle source
# File lib/redshift_connector/connector.rb, line 178 def import @logger.info "==== import task ==================================================" @importer.execute(@bundle) end
import_enabled?()
click to toggle source
# File lib/redshift_connector/connector.rb, line 164 def import_enabled? not ENV['EXPORT_ONLY'] end