class RedshiftConnector::Connector

Public Class Methods

new(exporter:, importer:, logger:) click to toggle source
# File lib/redshift_connector/connector.rb, line 153
def initialize(exporter:, importer:, logger:)
  @exporter = exporter
  @importer = importer
  @logger = logger
  @bundle = nil
end
transport_all( strategy: 'rename', schema:, table: nil, src_table: table, dest_table: table, columns:, bucket: nil, txn_id: nil, filter:, logger: RedshiftConnector.logger, quiet: false ) click to toggle source
# File lib/redshift_connector/connector.rb, line 115
def Connector.transport_all(
    strategy: 'rename',
    schema:,
    table: nil,
    src_table: table,
    dest_table: table,
    columns:,
    bucket: nil,
    txn_id: nil,
    filter:,
    logger: RedshiftConnector.logger,
    quiet: false
)
  logger = NullLogger.new if quiet
  bundle_params = DataFileBundleParams.new(
    bucket: bucket,
    schema: schema,
    table: src_table,
    txn_id: txn_id,
    filter: filter,
    logger: logger
  )
  exporter = Exporter.for_table(
    bundle_params: bundle_params,
    schema: schema,
    table: src_table,
    columns: columns,
    logger: logger
  )
  importer = Importer.for_rebuild(
    strategy: strategy,
    table: dest_table,
    columns: columns,
    logger: logger
  )
  new(exporter: exporter, importer: importer, logger: logger)
end
transport_all_from_s3( strategy: 'rename', table:, columns:, bucket: nil, prefix:, format:, filter: nil, logger: RedshiftConnector.logger, quiet: false ) click to toggle source
# File lib/redshift_connector/connector.rb, line 86
def Connector.transport_all_from_s3(
    strategy: 'rename',
    table:,
    columns:,
    bucket: nil,
    prefix:,
    format:,
    filter: nil,
    logger: RedshiftConnector.logger,
    quiet: false
)
  logger = NullLogger.new if quiet
  bundle = S3DataFileBundle.for_prefix(
    bucket: (bucket ? S3Bucket.get(bucket) : S3Bucket.default),
    prefix: prefix,
    format: format,
    filter: filter,
    logger: logger
  )
  exporter = ImmediateExporter.new(bundle: bundle, logger: logger)
  importer = Importer.for_rebuild(
    strategy: strategy,
    table: table,
    columns: columns,
    logger: logger
  )
  new(exporter: exporter, importer: importer, logger: logger)
end
transport_delta( schema:, table: nil, src_table: table, dest_table: table, condition:, columns:, delete_cond: nil, upsert_columns: nil, bucket: nil, txn_id: nil, filter:, logger: RedshiftConnector.logger, quiet: false ) click to toggle source
# File lib/redshift_connector/connector.rb, line 41
def Connector.transport_delta(
    schema:,
    table: nil,
    src_table: table,
    dest_table: table,
    condition:,
    columns:,
    delete_cond: nil,
    upsert_columns: nil,
    bucket: nil,
    txn_id: nil,
    filter:,
    logger: RedshiftConnector.logger,
    quiet: false
)
  unless src_table and dest_table
    raise ArgumentError, "missing :table, :src_table or :dest_table"
  end
  logger = NullLogger.new if quiet
  bundle_params = DataFileBundleParams.new(
    bucket: bucket,
    schema: schema,
    table: src_table,
    txn_id: txn_id,
    filter: filter,
    logger: logger
  )
  exporter = Exporter.for_table_delta(
    bundle_params: bundle_params,
    schema: schema,
    table: src_table,
    columns: columns,
    condition: condition,
    logger: logger
  )
  importer = Importer.for_delta_upsert(
    table: dest_table,
    columns: columns,
    delete_cond: delete_cond,
    upsert_columns: upsert_columns,
    logger: logger
  )
  new(exporter: exporter, importer: importer, logger: logger)
end
transport_delta_from_s3( bucket: nil, prefix:, format:, filter: nil, table:, columns:, delete_cond: nil, upsert_columns: nil, logger: RedshiftConnector.logger, quiet: false ) click to toggle source
# File lib/redshift_connector/connector.rb, line 10
def Connector.transport_delta_from_s3(
  bucket: nil,
  prefix:,
  format:,
  filter: nil,
  table:,
  columns:,
  delete_cond: nil,
  upsert_columns: nil,
  logger: RedshiftConnector.logger,
  quiet: false
)
  logger = NullLogger.new if quiet
  bundle = S3DataFileBundle.for_prefix(
    bucket: (bucket ? S3Bucket.get(bucket) : S3Bucket.default),
    prefix: prefix,
    format: format,
    filter: filter,
    logger: logger
  )
  exporter = ImmediateExporter.new(bundle: bundle, logger: logger)
  importer = Importer.for_delta_upsert(
    table: table,
    columns: columns,
    delete_cond: delete_cond,
    upsert_columns: upsert_columns,
    logger: logger
  )
  new(exporter: exporter, importer: importer, logger: logger)
end

Public Instance Methods

execute() click to toggle source
# File lib/redshift_connector/connector.rb, line 168
def execute
  export if export_enabled?
  import if import_enabled?
end
export() click to toggle source
# File lib/redshift_connector/connector.rb, line 173
def export
  @logger.info "==== export task =================================================="
  @bundle = @exporter.execute
end
export_enabled?() click to toggle source
# File lib/redshift_connector/connector.rb, line 160
def export_enabled?
  not ENV['IMPORT_ONLY']
end
import() click to toggle source
# File lib/redshift_connector/connector.rb, line 178
def import
  @logger.info "==== import task =================================================="
  @importer.execute(@bundle)
end
import_enabled?() click to toggle source
# File lib/redshift_connector/connector.rb, line 164
def import_enabled?
  not ENV['EXPORT_ONLY']
end