module Sql2Avro
Constants
- AVRO_TOOLS_PATH
Public Class Methods
avroize(database_config, table, min_id, max_rows_per_batch=nil, directory='/tmp')
click to toggle source
Pulls data from the given database table starting from the given id.
This function creates an Avro file as a side effect, and returns {
max_id: greatest ID that was pulled in, path: filepath of the resulting avroized file error: error message, if any; otherwise omitted
}
database_config is a hash with this form (like ActiveRecord's): {
adapter: "mysql", host: "localhost", username: "myuser", password: "mypass", database: "somedatabase"
}
table is the table to pull from.
min_id specifies the value of the id column from which to start.
# File lib/sql2avro.rb, line 30 def Sql2Avro.avroize(database_config, table, min_id, max_rows_per_batch=nil, directory='/tmp') raise "Database interface not specified." if !database_config.has_key? 'adapter' raise "Database interface not supported: #{database_config['adapter']}" unless ['mysql', 'mysql2'].include? database_config['adapter'] interface = MySql.new(database_config) schema = Yajl::Encoder.encode(interface.schema(table)) max_id = interface.max_id(table) max_id_this_batch = if max_rows_per_batch.nil? max_id else [max_id, min_id + max_rows_per_batch].min end date, time, zone = Time.now.utc.to_s.split filename = "#{table}.#{date}T#{time}Z.#{min_id}.#{max_id_this_batch}.avro" retval = { max_id: max_id_this_batch, path: File.join(directory, filename) } prev_default_internal = Encoding.default_internal Encoding.default_internal = nil json_file = File.join(directory, "#{filename}.json") File.open(json_file, 'w') do |f| interface.data(table, min_id, max_id_this_batch) do |datum| Yajl::Encoder.encode(datum, f) f.write "\n" end end Encoding.default_internal = prev_default_internal cmd = "java -jar #{AVRO_TOOLS_PATH} fromjson --codec snappy --schema '#{schema}' #{json_file} > #{File.join(directory, filename)}" `#{cmd}` if !$?.success? raise "Error converting JSON to Avro.\n\nCommand: #{cmd}\nStatus: #{$?}" end `rm #{json_file}` if !$?.success? raise "Error deleting temporary JSON file #{json_file}" end retval end