class MySql
Constants
- MYSQL_BATCH_SEP
Public Class Methods
avro_type(mysql_type)
click to toggle source
# File lib/interface/mysql.rb, line 162 def self.avro_type(mysql_type) # Refer to https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/ConnManager.java#L172. case mysql_type # See https://dev.mysql.com/doc/refman/5.0/en/numeric-type-overview.html when /tinyint\(1\)/, /bool/, /boolean/ 'boolean' when /bigint/, /serial/ 'long' when /tinyint/, /smallint/, /mediumint/, /integer/, /int/ 'int' when /decimal/, /dec/ 'string' when /float/ 'float' when /double/ 'double' when /varchar\(\d+\)/ 'string' # See https://dev.mysql.com/doc/refman/5.0/en/date-and-time-type-overview.html. when /date/, /datetime/, /time/, /timestamp/ 'string' when /year/ 'int' # See https://dev.mysql.com/doc/refman/5.0/en/string-type-overview.html. when /char/, /varchar/ 'string' when /binary/, /varbinary/ 'bytes' when /tinytext/, /text/, /longtext/ 'string' when /tinyblob/, /blob/, /longblob/ 'bytes' else raise "Unsupported MySQL data type: #{mysql_type}" end end
new(config)
click to toggle source
config is a hash with this form (like ActiveRecord's): {
host: "localhost", username: "myuser", password: "mypass", database: "somedatabase"
}
# File lib/interface/mysql.rb, line 15 def initialize(config) @db_host = config['host'] @db_name = config['database'] @username = config['username'] @password = config['password'] end
query(sql, db_host, db_name, username, password, &block)
click to toggle source
# File lib/interface/mysql.rb, line 139 def self.query(sql, db_host, db_name, username, password, &block) cmd = %{ mysql \\ --batch \\ --execute="SET NAMES 'utf8'; #{sql}" \\ --host #{db_host} \\ --user #{username} \\ --password=#{password} \\ --quick \\ #{db_name} } Open3.popen3(cmd) do |i, o, e, wait_thr| while (line = o.gets) block.call(line.chop.split(MYSQL_BATCH_SEP)) end if !wait_thr.value.success? raise "Error querying mysql\n\nQuery: #{cmd}\nSTDERR: #{e.read}\nSTATUS: #{wait_thr.value}" end end end
Public Instance Methods
avro_types(table)
click to toggle source
# File lib/interface/mysql.rb, line 124 def avro_types(table) mysql_types = sql_schema(table) types = {} mysql_types.each do |k,v| types[k] = MySql.avro_type(v) end types end
data(table, min_id, max_id, &block)
click to toggle source
# File lib/interface/mysql.rb, line 50 def data(table, min_id, max_id, &block) columns = nil rows = [] types = avro_types(table) sql = """ SELECT * FROM #{table} WHERE id >= #{min_id} AND id <= #{max_id} """ query(sql) do |line| # Get header. if columns.nil? columns = line next end # Construct row mapping column names to values of appropriate type. row = (0...columns.length).each_with_object({}) do |i, h| colname = columns[i] value = line[i] # NOTE: all non-null type values are wrapped in a mapping from type to value, # because that's what the Avro spec requires; see: # - http://avro.apache.org/docs/current/spec.html#json_encoding # - http://mail-archives.apache.org/mod_mbox/avro-user/201304.mbox/%3CCD86687D.E892E%25scott@richrelevance.com%3E # Handle nulls. if value == "NULL" h[columns[i]] = nil next end # Perform any necessary typecasts. type = types[colname] h[colname] = case type when 'boolean' { type => value.to_i.zero? } when 'int','long' { type => value.to_i } when 'float','double' { type => value.to_f } when 'bytes' { type => value } when 'string' { type => value } else raise "Unsupported type: #{type}" end end block.call(row) end end
max_id(table)
click to toggle source
# File lib/interface/mysql.rb, line 38 def max_id(table) header_seen = false query("SELECT MAX(id) FROM #{table}") do |line| unless header_seen header_seen = true next end return line.first.to_i end end
query(sql, &block)
click to toggle source
# File lib/interface/mysql.rb, line 135 def query(sql, &block) MySql.query(sql, @db_host, @db_name, @username, @password, &block) end
schema(table)
click to toggle source
# File lib/interface/mysql.rb, line 22 def schema(table) types = avro_types(table) schema = { type: "record", name: table, fields: [] } types.each do |k,v| schema[:fields] << { name: k, type: ['null', v] } end schema end
sql_schema(table)
click to toggle source
# File lib/interface/mysql.rb, line 107 def sql_schema(table) header_seen = false columns = {} query("DESCRIBE #{table}") do |line| if header_seen == false header_seen = true next end name, type = line[0], line[1] columns[name] = type end columns end