class Fluent::Plugin::MysqlSelectInsertOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_select_insert.rb, line 55 def configure(conf) super if select_query =~ /\bwhere\b/i fail Fluent::ConfigError, "You can't specify WHERE clause in 'select_query'" end if select_query !~ /\A\s*select\b/i fail Fluent::ConfigError, "'select_query' should begin with 'SELECT'" end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_mysql_select_insert.rb, line 98 def multi_workers_ready? true end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_mysql_select_insert.rb, line 66 def write(chunk) client = new_client condition_values = [] chunk.msgpack_each do |time, record| condition_values << "'#{client.escape(record[condition_key])}'" end sql = <<~SQL INSERT #{"IGNORE" if ignore} INTO `#{table}` #{"(#{inserted_columns.join(",")})" if inserted_columns} #{select_query} WHERE #{condition_column} IN (#{condition_values.join(",")}) SQL sql << " AND (#{extra_condition[0]})" unless extra_condition.empty? bound_params = extra_condition[1..-1]&.map { |c| extract_placeholders(c, chunk.metadata) } begin if bound_params.nil? || bound_params.empty? client.query(sql) else require "mysql2-cs-bind" client.xquery(sql, bound_params) end rescue Mysql2::Error => e if e.message.start_with?("Column count doesn't match value count") raise Fluent::UnrecoverableError, "#{e.class}: #{e}" end raise end client.close end
Private Instance Methods
new_client()
click to toggle source
# File lib/fluent/plugin/out_mysql_select_insert.rb, line 104 def new_client Mysql2::Client.new( host: @host, port: @port, username: @username, password: @password, database: @database, ) end