class Fluent::SQLOutput::TableElement

TODO: Merge SQLInput's TableElement

Attributes

model[R]
pattern[R]

Public Class Methods

new(pattern, log, enable_fallback) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 54
def initialize(pattern, log, enable_fallback)
  super()
  @pattern = MatchPattern.create(pattern)
  @log = log
  @enable_fallback = enable_fallback
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 61
def configure(conf)
  super

  @mapping = parse_column_mapping(@column_mapping)
  @format_proc = Proc.new { |record|
    new_record = {}
    @mapping.each { |k, c|
      new_record[c] = record[k]
    }
    new_record
  }
end
import(chunk) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 91
def import(chunk)
  records = []
  chunk.msgpack_each { |tag, time, data|
    begin
      # format process should be moved to emit / format after supports error stream.
      records << @model.new(@format_proc.call(data))
    rescue => e
      args = {:error => e.message, :error_class => e.class, :table => @table, :record => Yajl.dump(data)}
      @log.warn "Failed to create the model. Ignore a record:", args
    end
  }
  begin
    @model.import(records)
  rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e
    if @enable_fallback
      # ignore other exceptions to use Fluentd retry mechanizm
      @log.warn "Got deterministic error. Fallback to one-by-one import", :error => e.message, :error_class => e.class
      one_by_one_import(records)
    else
      $log.warn "Got deterministic error. Fallback is disabled", :error => e.message, :error_class => e.class
      raise e
    end
  end
end
init(base_model) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 74
def init(base_model)
  # See SQLInput for more details of following code
  table_name = @table
  @model = Class.new(base_model) do
    self.table_name = table_name
    self.inheritance_column = '_never_use_output_'
  end

  class_name = table_name.singularize.camelize
  base_model.const_set(class_name, @model)
  model_name = ActiveModel::Name.new(@model, nil, class_name)
  @model.define_singleton_method(:model_name) { model_name }

  # TODO: check column_names and table schema
  # @model.column_names
end
one_by_one_import(records) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 116
def one_by_one_import(records)
  records.each { |record|
    retries = 0
    begin
      @model.import([record])
    rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e
      @log.error "Got deterministic error again. Dump a record", :error => e.message, :error_class => e.class, :record => record
    rescue => e
      retries += 1
      if retries > @num_retries
        @log.error "Can't recover undeterministic error. Dump a record", :error => e.message, :error_class => e.class, :record => record
        next
      end

      @log.warn "Failed to import a record: retry number = #{retries}", :error  => e.message, :error_class => e.class
      sleep 0.5
      retry
    end
  }
end

Private Instance Methods

parse_column_mapping(column_mapping_conf) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 139
def parse_column_mapping(column_mapping_conf)
  mapping = {}
  column_mapping_conf.split(',').each { |column_map|
    key, column = column_map.strip.split(':', 2)
    column = key if column.nil?
    mapping[key] = column
  }
  mapping
end