class Fluent::SQLOutput

Attributes

tables[RW]

Public Class Methods

desc(description) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 13
def desc(description)
end
new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 150
def initialize
  super
  require 'active_record'
  require 'activerecord-import'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 156
def configure(conf)
  super

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @tables = []
  @default_table = nil
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(e.arg, log, @enable_fallback)
    te.configure(e)
    if e.arg.empty?
      $log.warn "Detect duplicate default table definition" if @default_table
      @default_table = te
    else
      @tables << te
    end
  }
  @only_default = @tables.empty?

  if @default_table.nil?
    raise ConfigError, "There is no default table. <table> is required in sql output"
  end
end
emit(tag, es, chain) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 215
def emit(tag, es, chain)
  if @only_default
    super(tag, es, chain)
  else
    super(tag, es, chain, format_tag(tag))
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 223
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 211
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 184
def start
  super

  config = {
    :adapter => @adapter,
    :host => @host,
    :port => @port,
    :database => @database,
    :username => @username,
    :password => @password,
    :socket => @socket,
  }

  @base_model = Class.new(ActiveRecord::Base) do
    self.abstract_class = true
  end

  SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model)
  ActiveRecord::Base.establish_connection(config)

  # ignore tables if TableElement#init failed
  @tables.reject! do |te|
    init_table(te, @base_model)
  end
  init_table(@default_table, @base_model)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 227
def write(chunk)
  ActiveRecord::Base.connection_pool.with_connection do

    @tables.each { |table|
      if table.pattern.match(chunk.key)
        return table.import(chunk)
      end
    }
    @default_table.import(chunk)
  end
end

Private Instance Methods

format_tag(tag) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 253
def format_tag(tag)
  if @remove_tag_prefix
    tag.gsub(@remove_tag_prefix, '')
  else
    tag
  end
end
init_table(te, base_model) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 241
def init_table(te, base_model)
  begin
    te.init(base_model)
    log.info "Selecting '#{te.table}' table"
    false
  rescue => e
    log.warn "Can't handle '#{te.table}' table. Ignoring.", :error => e.message, :error_class => e.class
    log.warn_backtrace e.backtrace
    true
  end
end