class Fluent::SQLInput
Constants
- SKIP_TABLE_REGEXP
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sql.rb, line 144 def configure(conf) super unless @state_file $log.warn "'state_file PATH' parameter is not set to a 'sql' source." $log.warn "this parameter is highly recommended to save the last rows to resume tailing." end @tables = conf.elements.select {|e| e.name == 'table' }.map {|e| te = TableElement.new te.configure(e) te } if config['all_tables'] @all_tables = true end end
generate_schema(table, schema_name)
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 262 def generate_schema table, schema_name require "avro_turf" require 'avro_turf/messaging' require "avro/builder" avro = AvroTurf::Messaging.new(registry_url: "http://#{KAFKA_SERVER}:8081") fields = @base_model.connection.columns(table.table).map do |col| col_type = if col.sql_type.include? 'bigint' 'long' elsif ['int', 'bool'].any? {|needle| col.sql_type.include?(needle)} 'int' elsif ['float', 'double', 'real'].any? {|needle| col.sql_type.include?(needle)} 'float' else 'string' end { 'name' => col.name, 'type' => ['null', col_type] } end field_types = fields.map{|field| [field['name'], (field['type'] - ['null']).first]}.to_h fields << {"name" => "enchilada_timestamp", "type" => "long"} fields << {"name" => "enchilada_time_with_format", "type" => "string"} schema_json = { "type": "record", "name": schema_name, "fields": fields }.to_json registry = avro.instance_variable_get('@registry') schema = Avro::Schema.parse(schema_json) schema_id = registry.register("#{schema_name}-value", schema) stored_schema = { 'schema_json' => schema_json, 'schema_id' => schema_id, 'field_types' => field_types, 'schema' => schema } set_schema_to_redis(schema_name, stored_schema) end
get_schema_from_redis_by_name(schema_name)
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 308 def get_schema_from_redis_by_name schema_name stored_schema = $redis.get(schema_name) end
init_redis()
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 258 def init_redis $redis = Redis.new end
set_schema_to_redis(schema_name, schema)
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 304 def set_schema_to_redis schema_name, schema $redis.set(schema_name, schema.to_json) end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 237 def shutdown @stop_flag = true end
start()
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 167 def start @state_store = @state_file.nil? ? MemoryStateStore.new : StateStore.new(@state_file) config = { :adapter => @adapter, :host => @host, :port => @port, :database => @database, :username => @username, :password => @password, :socket => @socket, } # creates subclass of ActiveRecord::Base so that it can have different # database configuration from ActiveRecord::Base. @base_model = Class.new(ActiveRecord::Base) do # base model doesn't have corresponding phisical table self.abstract_class = true end # ActiveRecord requires the base_model to have a name. Here sets name # of an anonymous class by assigning it to a constant. In Ruby, class has # a name of a constant assigned first SQLInput.const_set("BaseModel_#{rand(1 << 31)}", @base_model) # Now base_model can have independent configuration from ActiveRecord::Base @base_model.establish_connection(config) if @all_tables # get list of tables from the database @tables = @base_model.connection.tables.map do |table_name| if table_name.match(SKIP_TABLE_REGEXP) # some tables such as "schema_migrations" should be ignored nil else te = TableElement.new te.configure({ 'table' => table_name, 'tag' => table_name, 'update_column' => nil, }) te end end.compact end init_redis # ignore tables if TableElement#init failed @tables.reject! do |te| begin schema_name = "#{SERVER_PREFIX}_#{@tag_prefix}_#{te.tag.presence || te.table}_#{Digest::MD5.new.hexdigest(@base_model.connection.columns(te.table).map{|c| c.name}.to_s)[0..5]}" unless get_schema_from_redis_by_name schema_name generate_schema te, schema_name end te.init(@tag_prefix, @base_model, router) log.info "Selecting '#{te.table}' table" false rescue => e log.warn "Can't handle '#{te.table}' table. Ignoring.", :error => e.message, :error_class => e.class log.warn_backtrace e.backtrace true end end @stop_flag = false @thread = Thread.new(&method(:thread_main)) end
thread_main()
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 241 def thread_main until @stop_flag sleep @select_interval @tables.each do |t| begin last_record = @state_store.last_records[t.table] @state_store.last_records[t.table] = t.emit_next_records(last_record, @select_limit) @state_store.update! rescue => e log.error "unexpected error", :error => e.message, :error_class => e.class log.error_backtrace e.backtrace end end end end