class RedshiftConnector::Connector
Constants
- DEFAULT_BATCH_SIZE
Public Class Methods
new(exporter:, importer:, filter: nil, logger:)
click to toggle source
# File lib/redshift_connector/connector.rb, line 150 def initialize(exporter:, importer:, filter: nil, logger:) @exporter = exporter @importer = importer @filter = filter @logger = logger @bundle = nil end
transport_all( strategy: 'rename', schema:, table: nil, src_table: table, dest_table: table, columns:, bucket: nil, txn_id: nil, filter:, logger: RedshiftConnector.logger, quiet: false )
click to toggle source
# File lib/redshift_connector/connector.rb, line 113 def Connector.transport_all( strategy: 'rename', schema:, table: nil, src_table: table, dest_table: table, columns:, bucket: nil, txn_id: nil, filter:, logger: RedshiftConnector.logger, quiet: false ) logger = NullLogger.new if quiet bundle_params = DataFileBundleParams.new( bucket: bucket, schema: schema, table: src_table, txn_id: txn_id, logger: logger ) exporter = Exporter.for_table( bundle_params: bundle_params, schema: schema, table: src_table, columns: columns, logger: logger ) importer = Importer.for_rebuild( strategy: strategy, table: dest_table, columns: columns, logger: logger ) new(exporter: exporter, importer: importer, filter: filter, logger: logger) end
transport_all_from_s3( strategy: 'rename', table:, columns:, bucket: nil, prefix:, format:, filter: nil, logger: RedshiftConnector.logger, quiet: false )
click to toggle source
# File lib/redshift_connector/connector.rb, line 85 def Connector.transport_all_from_s3( strategy: 'rename', table:, columns:, bucket: nil, prefix:, format:, filter: nil, logger: RedshiftConnector.logger, quiet: false ) logger = NullLogger.new if quiet bundle = S3DataFileBundle.for_prefix( bucket: (bucket ? S3Bucket.get(bucket) : S3Bucket.default), prefix: prefix, format: format, logger: logger ) exporter = ImmediateExporter.new(bundle: bundle, logger: logger) importer = Importer.for_rebuild( strategy: strategy, table: table, columns: columns, logger: logger ) new(exporter: exporter, importer: importer, filter: filter, logger: logger) end
transport_delta( schema:, table: nil, src_table: table, dest_table: table, condition:, columns:, delete_cond: nil, upsert_columns: nil, bucket: nil, txn_id: nil, filter:, logger: RedshiftConnector.logger, quiet: false )
click to toggle source
# File lib/redshift_connector/connector.rb, line 41 def Connector.transport_delta( schema:, table: nil, src_table: table, dest_table: table, condition:, columns:, delete_cond: nil, upsert_columns: nil, bucket: nil, txn_id: nil, filter:, logger: RedshiftConnector.logger, quiet: false ) unless src_table and dest_table raise ArgumentError, "missing :table, :src_table or :dest_table" end logger = NullLogger.new if quiet bundle_params = DataFileBundleParams.new( bucket: bucket, schema: schema, table: src_table, txn_id: txn_id, logger: logger ) exporter = Exporter.for_table_delta( bundle_params: bundle_params, schema: schema, table: src_table, columns: columns, condition: condition, logger: logger ) importer = Importer.for_delta_upsert( table: dest_table, columns: columns, delete_cond: delete_cond, upsert_columns: upsert_columns, logger: logger ) new(exporter: exporter, importer: importer, filter: filter, logger: logger) end
transport_delta_from_s3( bucket: nil, prefix:, format:, filter: nil, table:, columns:, delete_cond: nil, upsert_columns: nil, logger: RedshiftConnector.logger, quiet: false )
click to toggle source
# File lib/redshift_connector/connector.rb, line 11 def Connector.transport_delta_from_s3( bucket: nil, prefix:, format:, filter: nil, table:, columns:, delete_cond: nil, upsert_columns: nil, logger: RedshiftConnector.logger, quiet: false ) logger = NullLogger.new if quiet bundle = S3DataFileBundle.for_prefix( bucket: (bucket ? S3Bucket.get(bucket) : S3Bucket.default), prefix: prefix, format: format, logger: logger ) exporter = ImmediateExporter.new(bundle: bundle, logger: logger) importer = Importer.for_delta_upsert( table: table, columns: columns, delete_cond: delete_cond, upsert_columns: upsert_columns, logger: logger ) new(exporter: exporter, importer: importer, filter: filter, logger: logger) end
Public Instance Methods
execute()
click to toggle source
# File lib/redshift_connector/connector.rb, line 166 def execute export if export_enabled? import if import_enabled? end
export()
click to toggle source
# File lib/redshift_connector/connector.rb, line 171 def export @logger.info "==== export task ==================================================" @bundle = @exporter.execute end
export_enabled?()
click to toggle source
# File lib/redshift_connector/connector.rb, line 158 def export_enabled? not ENV['IMPORT_ONLY'] end
import()
click to toggle source
# File lib/redshift_connector/connector.rb, line 178 def import @logger.info "==== import task ==================================================" r = DataFileBundleReader.new( @bundle, filter: @filter, batch_size: DEFAULT_BATCH_SIZE, logger: @logger ) @importer.execute(r) end
import_enabled?()
click to toggle source
# File lib/redshift_connector/connector.rb, line 162 def import_enabled? not ENV['EXPORT_ONLY'] end