class Fluent::Plugin::MysqlSelectInsertOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_select_insert.rb, line 55
def configure(conf)
  super

  if select_query =~ /\bwhere\b/i
    fail Fluent::ConfigError, "You can't specify WHERE clause in 'select_query'"
  end
  if select_query !~ /\A\s*select\b/i
    fail Fluent::ConfigError, "'select_query' should begin with 'SELECT'"
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_mysql_select_insert.rb, line 98
def multi_workers_ready?
  true
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_mysql_select_insert.rb, line 66
      def write(chunk)
        client = new_client
        condition_values = []
        chunk.msgpack_each do |time, record|
          condition_values << "'#{client.escape(record[condition_key])}'"
        end

        sql = <<~SQL
          INSERT #{"IGNORE" if ignore} INTO `#{table}` #{"(#{inserted_columns.join(",")})" if inserted_columns}
          #{select_query}
          WHERE #{condition_column} IN (#{condition_values.join(",")})
        SQL
        sql << " AND (#{extra_condition[0]})" unless extra_condition.empty?

        bound_params = extra_condition[1..-1]&.map { |c| extract_placeholders(c, chunk.metadata) }
        begin
          if bound_params.nil? || bound_params.empty?
            client.query(sql)
          else
            require "mysql2-cs-bind"
            client.xquery(sql, bound_params)
          end
        rescue Mysql2::Error => e
          if e.message.start_with?("Column count doesn't match value count")
            raise Fluent::UnrecoverableError, "#{e.class}: #{e}"
          end
          raise
        end

        client.close
      end

Private Instance Methods

new_client() click to toggle source
# File lib/fluent/plugin/out_mysql_select_insert.rb, line 104
def new_client
  Mysql2::Client.new(
    host: @host,
    port: @port,
    username: @username,
    password: @password,
    database: @database,
  )
end