class Toku::Anonymizer
Constants
- COLUMN_FILTER_MAP
Default column filters
- ROW_FILTER_MAP
Default row filters
- SCHEMA_DUMP_PATH
- THREADPOOL_SIZE
Attributes
column_filters[RW]
row_filters[RW]
Public Class Methods
new(config_file_path, column_filters = {}, row_filters = {})
click to toggle source
@param config_file_path [String] path of config file
# File lib/toku.rb, line 34 def initialize(config_file_path, column_filters = {}, row_filters = {}) @config = YAML.load(ERB.new(File.read(config_file_path)).result) @threadpool = Concurrent::FixedThreadPool.new(THREADPOOL_SIZE) self.column_filters = column_filters.merge(COLUMN_FILTER_MAP) self.row_filters = row_filters.merge(ROW_FILTER_MAP) Sequel::Database.extension(:pg_streaming) end
Public Instance Methods
dump_schema(uri)
click to toggle source
@param uri [String] @return [void]
# File lib/toku.rb, line 128 def dump_schema(uri) FileUtils::mkdir_p 'tmp' host = URI(uri).host password = URI(uri).password || ENV["PGPASSWORD"] user = URI(uri).user password = URI(uri).password port = URI(uri).port || 5432 db_name = URI(uri).path.tr("/", "") raise "pg_dump schema dump failed" unless system( "PGPASSWORD=#{password} pg_dump -s -h #{host} -p #{port} -U #{user} #{db_name} > #{SCHEMA_DUMP_PATH}" ) end
filter_class(type, symbol)
click to toggle source
param type [Hash] param symbol [Symbol] @return [Class]
# File lib/toku.rb, line 144 def filter_class(type, symbol) raise "Please provide a filter for #{symbol}" if type[symbol].nil? type[symbol] end
process_table(table, source_connection, destination_connection)
click to toggle source
@param source_connection [Sequel::Postgres::Database] @param destination_connection [Sequel::Postgres::Database] @return [void]
# File lib/toku.rb, line 106 def process_table(table, source_connection, destination_connection) row_enumerator = source_connection[table].stream.lazy @config[table.to_s]['rows'].each do |f| row_filter = if f.is_a? String self.row_filters[f.to_sym].new({}) elsif f.is_a? Hash self.row_filters[f.keys.first.to_sym].new(f.values.first) end row_enumerator = row_filter.call(row_enumerator) end destination_connection.run("ALTER TABLE #{table} DISABLE TRIGGER ALL;") destination_connection.copy_into(table, data: row_enumerator.map { |row| transform(row, table) }, format: :csv) destination_connection.run("ALTER TABLE #{table} ENABLE TRIGGER ALL;") count = destination_connection[table].count @global_object_count += count @tables_processed_count += 1 puts "Toku: copied #{count} objects into #{table} #{count != 0 ? ':)' : ':|'}" end
row_filters?(table)
click to toggle source
Are there row filters specified for this table? @param table [Symbol] @return [Boolean]
# File lib/toku.rb, line 152 def row_filters?(table) !@config[table.to_s]['rows'].nil? && @config[table.to_s]['rows'].any? end
run(uri_db_source, uri_db_destination)
click to toggle source
@param uri_db_source [String] uri_db_source URI of the DB to be anonimized @param uri_db_destination [String] URI of the destination DB @return [void]
# File lib/toku.rb, line 45 def run(uri_db_source, uri_db_destination) begin_time_stamp = Time.now @global_object_count = 0 @tables_processed_count = 0 source_db = Sequel.connect(uri_db_source) dump_schema(uri_db_source) parsed_destination_uri = URI(uri_db_destination) destination_db_name = parsed_destination_uri.path.tr("/", "") destination_connection = Sequel.connect("postgres://#{parsed_destination_uri.user}:#{parsed_destination_uri.password}@#{parsed_destination_uri.host}:#{parsed_destination_uri.port || 5432}/template1") destination_connection.run("DROP DATABASE IF EXISTS #{destination_db_name}") destination_connection.run("CREATE DATABASE #{destination_db_name}") destination_connection.disconnect destination_db = Sequel.connect(uri_db_destination) destination_db.run(File.read(SCHEMA_DUMP_PATH)) destination_pool = Sequel::ThreadedConnectionPool.new(destination_db) source_pool = Sequel::ThreadedConnectionPool.new(source_db) source_db.tables.each do |t| if !row_filters?(t) && @config[t.to_s]['columns'].count < source_db.from(t).columns.count raise Toku::ColumnFilterMissingError end @threadpool.post do destination_pool.hold do |destination_connection| source_pool.hold do |source_connection| process_table(t, source_connection.instance_variable_get(:@db), destination_connection.instance_variable_get(:@db)) end end end end @threadpool.shutdown @threadpool.wait_for_termination source_db.disconnect destination_db.disconnect FileUtils.rm(SCHEMA_DUMP_PATH) puts "Toku: copied #{@global_object_count} elements accross #{@tables_processed_count} tables and that took #{(Time.now - begin_time_stamp).round(2)} seconds with #{THREADPOOL_SIZE} green threads" nil end
transform(row, table_name)
click to toggle source
@param name [Symbol] @param row [Hash] @return [String]
# File lib/toku.rb, line 88 def transform(row, table_name) row.map do |row_key, row_value| @config[table_name.to_s]['columns'][row_key.to_s].inject(row_value) do |result, filter| if filter.is_a? Hash filter_class(column_filters, filter.keys.first.to_sym).new( result, filter.values.first ).call elsif filter.is_a? String filter_class(column_filters, filter.to_sym).new(result, {}).call end end end.to_csv end