class ActiveRecordCopy::Decoder

Public Class Methods

new(options = {}) click to toggle source
# File lib/activerecord-copy/decoder.rb, line 6
def initialize(options = {})
  @options = options
  @closed = false
  if options[:column_types].is_a?(Array)
    map = {}
    options[:column_types].each_with_index do |c, i|
      map[i] = c
    end
    options[:column_types] = map
  else
    options[:column_types] ||= {}
  end
  @io = nil
end

Public Instance Methods

each() { |result| ... } click to toggle source
# File lib/activerecord-copy/decoder.rb, line 46
def each
  loop do
    result = read_line
    break unless result
    yield result
    break if @closed
  end
end
read_line() click to toggle source
# File lib/activerecord-copy/decoder.rb, line 21
def read_line
  return nil if @closed
  setup_io unless @io
  row = []
  bytes = @io.read(2)
  # p bytes
  column_count = bytes.unpack(PACKED_UINT_16).first
  if column_count == 65_535
    @closed = true
    return nil
  end
  # @io.write([row.size].pack(PACKED_UINT_32))
  0.upto(column_count - 1).each do |index|
    field = decode_field(@io)
    row[index] = if field.nil?
                   field
                 elsif @options[:column_types][index]
                   map_field(field, @options[:column_types][index])
                 else
                   field
                 end
  end
  row
end

Private Instance Methods

decode_field(io) click to toggle source
# File lib/activerecord-copy/decoder.rb, line 74
def decode_field(io)
  bytes = io.read(4)

  if bytes == "\xFF\xFF\xFF\xFF".force_encoding(ASCII_8BIT_ENCODING)
    return nil
  else
    io.read(bytes.unpack(PACKED_UINT_32).first)
  end
end
map_field(data, type) click to toggle source
# File lib/activerecord-copy/decoder.rb, line 84
def map_field(data, type)
  # p [type, data]

  case type
  when :int, :integer
    data.unpack(PACKED_UINT_32).first
  when :bytea
    data
  when :bigint
    data.unpack(PACKED_UINT_64).first
  when :float, :double
    data.unpack(PACKED_FLOAT_64).first
  when :boolean
    v = data.unpack(PACKED_UINT_8).first
    v == 1
  when :string, :text, :character
    data.force_encoding(UTF_8_ENCODING)
  when :json
    JSON.load(data)
  when :uuid
    r = data.unpack('H*').first
    "#{r[0..7]}-#{r[8..11]}-#{r[12..15]}-#{r[16..19]}-#{r[20..-1]}"
  when :uuid_raw
    r = data.unpack('H*').first
  when :array, :"integer[]", :"uuid[]", :"character[]"
    io = StringIO.new(data)
    io.read(4) # unknown
    io.read(4) # unknown
    atype_raw = io.read(4)
    return [] if atype_raw.nil?
    atype = atype_raw.unpack(PACKED_UINT_32).first # string type?
    return [] if io.pos == io.size
    size = io.read(4).unpack(PACKED_UINT_32).first
    io.read(4) # should be 1 for dimension
    # p [atype, size]
    # p data
    case atype
    when UUID_TYPE_OID
      0.upto(size - 1).map do
        io.read(4) # size
        r = io.read(16).unpack(PACKED_HEX_STRING).first
        "#{r[0..7]}-#{r[8..11]}-#{r[12..15]}-#{r[16..19]}-#{r[20..-1]}"
      end
    when TEXT_TYPE_OID, VARCHAR_TYPE_OID
      0.upto(size - 1).map do
        size = io.read(4).unpack(PACKED_UINT_32).first
        io.read(size)
      end
    when INT_TYPE_OID
      0.upto(size - 1).map do
        size = io.read(4).unpack(PACKED_UINT_32).first
        bytes = io.read(size)
        bytes.unpack(PACKED_UINT_32).first
      end
    else
      raise "Unsupported Array type #{atype}"
    end
  when :hstore, :hash
    io = StringIO.new(data)
    fields = io.read(4).unpack(PACKED_UINT_32).first
    h = {}

    0.upto(fields - 1).each do
      key_size = io.read(4).unpack(PACKED_UINT_32).first
      key = io.read(key_size).force_encoding("UTF-8")
      value_size = io.read(4).unpack(PACKED_UINT_32).first
      if value_size == 4294967295 # nil  "\xFF\xFF\xFF\xFF"
        value = nil
      else
        value = io.read(value_size)
        value = value.force_encoding("UTF-8") if !value.nil?
      end
      h[key] = value
    end
    raise "remaining hstore bytes!" if io.pos != io.size
    h
  when :time, :timestamp
    d = data.unpack("L!>").first
    Time.at((d + POSTGRES_EPOCH_TIME) / 1_000_000.0).utc
  when :date
    # couldn't find another way to get signed network byte order
    m = 0b0111_1111_1111_1111_1111_1111_1111_1111
    d = data.unpack(PACKED_UINT_32).first
    d = (d & m) - m - 1 if data.bytes[0] & 0b1000_0000 > 0 # negative number

    # p [data, d, Date.jd(d + Date.new(2000,1,1).jd)]
    Date.jd(d + Date.new(2000, 1, 1).jd)
  else
    raise "Unsupported format #{type}"
  end
end
setup_io() click to toggle source
# File lib/activerecord-copy/decoder.rb, line 57
def setup_io
  if @options[:file]
    @io = File.open(@options[:file], 'r:' + ASCII_8BIT_ENCODING)
  elsif !@options[:io].nil?
    @io = @options[:io]
  else
    raise 'NO io present'
  end
  header = "PGCOPY\n\377\r\n\0".force_encoding(ASCII_8BIT_ENCODING)
  result = @io.read(header.bytesize)
  raise 'invalid format' if result != header
  # p @io.read(10)

  @io.read(2) # blank
  @io.read(6) # blank
end