class MySql

Constants

MYSQL_BATCH_SEP

Public Class Methods

avro_type(mysql_type) click to toggle source
# File lib/interface/mysql.rb, line 162
def self.avro_type(mysql_type)
  # Refer to https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/ConnManager.java#L172.

  case mysql_type

  # See https://dev.mysql.com/doc/refman/5.0/en/numeric-type-overview.html
  when /tinyint\(1\)/, /bool/, /boolean/
    'boolean'
  when /bigint/, /serial/
    'long'
  when /tinyint/, /smallint/, /mediumint/, /integer/, /int/
    'int'
  when /decimal/, /dec/
    'string'
  when /float/
    'float'
  when /double/
    'double'
  when /varchar\(\d+\)/
    'string'

  # See https://dev.mysql.com/doc/refman/5.0/en/date-and-time-type-overview.html.
  when /date/, /datetime/, /time/, /timestamp/
    'string'
  when /year/
    'int'

  # See https://dev.mysql.com/doc/refman/5.0/en/string-type-overview.html.
  when /char/, /varchar/
    'string'
  when /binary/, /varbinary/
    'bytes'
  when /tinytext/, /text/, /longtext/
    'string'
  when /tinyblob/, /blob/, /longblob/
    'bytes'
  else
    raise "Unsupported MySQL data type: #{mysql_type}"
  end
end
new(config) click to toggle source

config is a hash with this form (like ActiveRecord's): {

host:     "localhost",
username: "myuser",
password: "mypass",
database: "somedatabase"

}

# File lib/interface/mysql.rb, line 15
def initialize(config)
  @db_host = config['host']
  @db_name = config['database']
  @username = config['username']
  @password = config['password']
end
query(sql, db_host, db_name, username, password, &block) click to toggle source
# File lib/interface/mysql.rb, line 139
def self.query(sql, db_host, db_name, username, password, &block)
  cmd = %{
    mysql \\
      --batch \\
      --execute="SET NAMES 'utf8'; #{sql}" \\
      --host #{db_host} \\
      --user #{username} \\
      --password=#{password} \\
      --quick \\
      #{db_name}
  }

  Open3.popen3(cmd) do |i, o, e, wait_thr|
    while (line = o.gets)
      block.call(line.chop.split(MYSQL_BATCH_SEP))
    end

    if !wait_thr.value.success?
      raise "Error querying mysql\n\nQuery: #{cmd}\nSTDERR: #{e.read}\nSTATUS: #{wait_thr.value}"
    end
  end
end

Public Instance Methods

avro_types(table) click to toggle source
# File lib/interface/mysql.rb, line 124
def avro_types(table)
  mysql_types = sql_schema(table)

  types = {}
  mysql_types.each do |k,v|
    types[k] = MySql.avro_type(v)
  end

  types
end
data(table, min_id, max_id, &block) click to toggle source
# File lib/interface/mysql.rb, line 50
def data(table, min_id, max_id, &block)
  columns = nil
  rows = []

  types = avro_types(table)

  sql = """
    SELECT *
    FROM #{table}
    WHERE id >= #{min_id}
      AND id <= #{max_id}
  """
  query(sql) do |line|
    # Get header.
    if columns.nil?
      columns = line
      next
    end

    # Construct row mapping column names to values of appropriate type.
    row = (0...columns.length).each_with_object({}) do |i, h|
      colname = columns[i]
      value = line[i]

      # NOTE: all non-null type values are wrapped in a mapping from type to value,
      # because that's what the Avro spec requires; see:
      #  - http://avro.apache.org/docs/current/spec.html#json_encoding
      #  - http://mail-archives.apache.org/mod_mbox/avro-user/201304.mbox/%3CCD86687D.E892E%25scott@richrelevance.com%3E

      # Handle nulls.
      if value == "NULL"
        h[columns[i]] = nil
        next
      end

      # Perform any necessary typecasts.
      type = types[colname]
      h[colname] = case type
      when 'boolean'
        { type => value.to_i.zero? }
      when 'int','long'
        { type => value.to_i }
      when 'float','double'
        { type => value.to_f }
      when 'bytes'
        { type => value }
      when 'string'
        { type => value }
      else
        raise "Unsupported type: #{type}"
      end
    end

    block.call(row)
  end
end
max_id(table) click to toggle source
# File lib/interface/mysql.rb, line 38
def max_id(table)
  header_seen = false
  query("SELECT MAX(id) FROM #{table}") do |line|
    unless header_seen
      header_seen = true
      next
    end

    return line.first.to_i
  end
end
query(sql, &block) click to toggle source
# File lib/interface/mysql.rb, line 135
def query(sql, &block)
  MySql.query(sql, @db_host, @db_name, @username, @password, &block)
end
schema(table) click to toggle source
# File lib/interface/mysql.rb, line 22
def schema(table)
  types = avro_types(table)

  schema = {
    type: "record",
    name: table,
    fields: []
  }

  types.each do |k,v|
    schema[:fields] << { name: k, type: ['null', v] }
  end

  schema
end
sql_schema(table) click to toggle source
# File lib/interface/mysql.rb, line 107
def sql_schema(table)
  header_seen = false
  columns = {}

  query("DESCRIBE #{table}") do |line|
    if header_seen == false
      header_seen = true
      next
    end

    name, type = line[0], line[1]
    columns[name] = type
  end

  columns
end