class ActiveRecordCopy::EncodeForCopy
Constants
- NUMERIC_DEC_DIGITS
- NUMERIC_NBASE
- RANGE_LB_INC
From the Postgres source: Binary representation: The first byte is the flags, then the lower bound (if present), then the upper bound (if present). Each bound is represented by a 4-byte length header and the binary representation of that bound (as returned by a call to the send function for the subtype).
- RANGE_LB_INF
- RANGE_UB_INC
- RANGE_UB_INF
Public Class Methods
new(options = {})
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 22 def initialize(options = {}) @options = options @closed = false @column_types = @options[:column_types] || {} @io = nil @buffer = TempBuffer.new end
Public Instance Methods
add(row)
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 30 def add(row) setup_io unless @io @io.write([row.size].pack(PACKED_UINT_16)) row.each_with_index do |col, index| encode_field(@buffer, col, index) next if @buffer.empty? @io.write(@buffer.read) @buffer.reopen end end
close()
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 41 def close @closed = true unless @buffer.empty? @io.write(@buffer.read) @buffer.reopen end @io.write([-1].pack(PACKED_UINT_16)) rescue raise Exception, 'No rows have been added to the encoder!' @io.rewind end
get_io()
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 51 def get_io close unless @closed @io end
remove()
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 56 def remove return unless @io.is_a?(Tempfile) @io.close @io.unlink end
Private Instance Methods
base10_to_base10000(intval)
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 321 def base10_to_base10000(intval) digits = [] loop do newintval = intval / NUMERIC_NBASE digits << intval - newintval * NUMERIC_NBASE intval = newintval break if intval == 0 end digits end
encode_array(io, field, index)
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 197 def encode_array(io, field, index) array_io = TempBuffer.new field.compact! completed = false case field[0] when String if @column_types[index] == :uuid array_io.write([1].pack(PACKED_UINT_32)) # unknown array_io.write([0].pack(PACKED_UINT_32)) # unknown array_io.write([UUID_TYPE_OID].pack(PACKED_UINT_32)) array_io.write([field.size].pack(PACKED_UINT_32)) array_io.write([1].pack(PACKED_UINT_32)) # forcing single dimension array for now field.each do |val| buf = [val.delete('-')].pack(PACKED_HEX_STRING) write_field(array_io, buf) end else array_io.write([1].pack(PACKED_UINT_32)) # unknown array_io.write([0].pack(PACKED_UINT_32)) # unknown array_io.write([VARCHAR_TYPE_OID].pack(PACKED_UINT_32)) array_io.write([field.size].pack(PACKED_UINT_32)) array_io.write([1].pack(PACKED_UINT_32)) # forcing single dimension array for now field.each do |val| buf = val.to_s.encode(UTF_8_ENCODING) write_field(array_io, buf) end end when Integer array_io.write([1].pack(PACKED_UINT_32)) # unknown array_io.write([0].pack(PACKED_UINT_32)) # unknown array_io.write([INT_TYPE_OID].pack(PACKED_UINT_32)) array_io.write([field.size].pack(PACKED_UINT_32)) array_io.write([1].pack(PACKED_UINT_32)) # forcing single dimension array for now field.each do |val| buf = [val.to_i].pack(PACKED_UINT_32) write_field(array_io, buf) end when nil io.write([-1].pack(PACKED_UINT_32)) completed = true else raise Exception, 'Arrays support int or string only' end unless completed io.write([array_io.pos].pack(PACKED_UINT_32)) io.write(array_io.string) end end
encode_based_on_input(io, field, index, depth)
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 143 def encode_based_on_input(io, field, index, depth) case field when Integer buf = [field].pack(PACKED_UINT_32) write_field(io, buf) when Float buf = [field].pack(PACKED_FLOAT_64) write_field(io, buf) when true buf = [1].pack(PACKED_UINT_8) write_field(io, buf) when false buf = [0].pack(PACKED_UINT_8) write_field(io, buf) when String buf = field.encode(UTF_8_ENCODING) write_field(io, buf) when Hash raise Exception, "Hash's can't contain hashes" if depth > 0 hash_io = TempBuffer.new hash_io.write([field.size].pack(PACKED_UINT_32)) field.each_pair do |key, val| buf = key.to_s.encode(UTF_8_ENCODING) write_field(hash_io, buf) encode_field(hash_io, val.nil? ? val : val.to_s, index, depth + 1) end io.write([hash_io.pos].pack(PACKED_UINT_32)) # size of hstore data io.write(hash_io.string) when Time write_simple_field(io, field, :timestamp) when Date write_simple_field(io, field, :date) when IPAddr encode_ip_addr(io, field) when Range range_type = case field.begin when Integer :int4range when Float :numrange when Time :tstzrange when Date :daterange else raise Exception, "Unsupported range input type #{field.begin.class.name} for index #{index}" end encode_range(io, field, range_type) else raise Exception, "Unsupported Format: #{field.class.name}" end end
encode_field(io, field, index, depth = 0)
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 109 def encode_field(io, field, index, depth = 0) # Nil is an exception in that any kind of field type can have a nil value transmitted if field.nil? io.write([-1].pack(PACKED_UINT_32)) return end if field.is_a?(Array) && ![:json, :jsonb].include?(@column_types[index]) encode_array(io, field, index) return end case @column_types[index] when :bigint, :integer, :smallint, :numeric, :float write_simple_field(io, field, @column_types[index]) when :uuid buf = [field.delete('-')].pack(PACKED_HEX_STRING) write_field(io, buf) when :inet encode_ip_addr(io, IPAddr.new(field)) when :binary write_field(io, field) when :json buf = field.to_json.encode(UTF_8_ENCODING) write_field(io, buf) when :jsonb encode_jsonb(io, field) when :int4range, :int8range, :numrange, :tsrange, :tstzrange, :daterange encode_range(io, field, @column_types[index]) else encode_based_on_input(io, field, index, depth) end end
encode_ip_addr(io, ip_addr)
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 253 def encode_ip_addr(io, ip_addr) if ip_addr.ipv6? io.write([4 + 16].pack(PACKED_UINT_32)) # Field data size io.write([3].pack(PACKED_UINT_8)) # Family (PGSQL_AF_INET6) io.write([128].pack(PACKED_UINT_8)) # Bits io.write([0].pack(PACKED_UINT_8)) # Is CIDR? => No io.write([16].pack(PACKED_UINT_8)) # Address length in bytes else io.write([4 + 4].pack(PACKED_UINT_32)) # Field data size io.write([2].pack(PACKED_UINT_8)) # Family (PGSQL_AF_INET) io.write([32].pack(PACKED_UINT_8)) # Bits io.write([0].pack(PACKED_UINT_8)) # Is CIDR? => No io.write([4].pack(PACKED_UINT_8)) # Address length in bytes end io.write(ip_addr.hton) end
encode_jsonb(io, field)
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 313 def encode_jsonb(io, field) buf = field.to_json.encode(UTF_8_ENCODING) io.write([1 + buf.bytesize].pack(PACKED_UINT_32)) io.write([1].pack(PACKED_UINT_8)) # JSONB format version 1 io.write(buf) end
encode_numeric(io, field)
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 333 def encode_numeric(io, field) float_str = field.to_s digits_base10 = float_str.scan(/\d/).map(&:to_i) weight_base10 = float_str.index('.') sign = field < 0.0 ? 0x4000 : 0 dscale = digits_base10.size - weight_base10 int_part, frac_part = float_str.split('.') frac_part += '0' * (NUMERIC_DEC_DIGITS - frac_part.size % NUMERIC_DEC_DIGITS) # Add trailing zeroes so digit calculations are correct digits_before_decpoint = base10_to_base10000(int_part.to_i) digits_after_decpoint = base10_to_base10000(frac_part.to_i).reverse weight = digits_before_decpoint.size - 1 digits = digits_before_decpoint + digits_after_decpoint io.write([2 * 4 + 2 * digits.size].pack(PACKED_UINT_32)) # Field data size io.write([digits.size].pack(PACKED_UINT_16)) # ndigits io.write([weight].pack(PACKED_UINT_16)) # weight io.write([sign].pack(PACKED_UINT_16)) # sign io.write([dscale].pack(PACKED_UINT_16)) # dscale digits.each { |d| io.write([d].pack(PACKED_UINT_16)) } # NumericDigits end
encode_range(io, range, range_type)
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 279 def encode_range(io, range, range_type) field_data_type = case range_type when :int4range :integer when :int8range :bigint when :numrange :numeric when :tsrange :timestamp when :tstzrange :timestamptz when :daterange :date else raise Exception, "Unsupported range type: #{range_type}" end flags = 0 flags |= RANGE_LB_INC # Ruby ranges always include the lower bound flags |= RANGE_UB_INC unless range.exclude_end? flags |= RANGE_LB_INF if range.begin.respond_to?(:infinite?) && range.begin.infinite? flags |= RANGE_UB_INF if range.end.respond_to?(:infinite?) && range.end.infinite? tmp_io = IntermediateBuffer.new tmp_io.write([flags].pack(PACKED_UINT_8)) if range.begin && (!range.begin.respond_to?(:infinite?) || !range.begin.infinite?) write_simple_field(tmp_io, range.begin, field_data_type) end if range.end && (!range.end.respond_to?(:infinite?) || !range.end.infinite?) write_simple_field(tmp_io, range.end, field_data_type) end io.write([tmp_io.size].pack(PACKED_UINT_32)) io.write(tmp_io.bytes) end
setup_io()
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 65 def setup_io if @options[:use_tempfile] == true @io = Tempfile.new('copy_binary', encoding: 'ascii-8bit') @io.unlink unless @options[:skip_unlink] == true else @io = StringIO.new end @io.write("PGCOPY\n\377\r\n\0") @io.write([0, 0].pack(PACKED_UINT_32 + PACKED_UINT_32)) end
write_field(io, buf)
click to toggle source
# File lib/activerecord-copy/encode_for_copy.rb, line 76 def write_field(io, buf) io.write([buf.bytesize].pack(PACKED_UINT_32)) io.write(buf) end
write_simple_field(io, field, type)
click to toggle source
Primitive types that can also appear in ranges/arrays/etc
# File lib/activerecord-copy/encode_for_copy.rb, line 82 def write_simple_field(io, field, type) case type when :bigint buf = [field.to_i].pack(PACKED_UINT_64) write_field(io, buf) when :integer buf = [field.to_i].pack(PACKED_UINT_32) write_field(io, buf) when :smallint buf = [field.to_i].pack(PACKED_UINT_16) write_field(io, buf) when :numeric encode_numeric(io, field) when :float buf = [field].pack(PACKED_FLOAT_64) write_field(io, buf) when :timestamp, :timestamptz buf = [(field.tv_sec * 1_000_000 + field.tv_usec - POSTGRES_EPOCH_TIME).to_i].pack(PACKED_UINT_64) write_field(io, buf) when :date buf = [(field - Date.new(2000, 1, 1)).to_i].pack(PACKED_UINT_32) write_field(io, buf) else raise Exception, "Unsupported simple type: #{type}" end end