class ActiveRecord::ConnectionAdapters::ClickhouseAdapter

Constants

ADAPTER_NAME
NATIVE_DATABASE_TYPES

Public Class Methods

new(logger, connection_parameters, config, full_config) click to toggle source

Initializes and connects a Clickhouse adapter.

Calls superclass method
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 128
def initialize(logger, connection_parameters, config, full_config)
  super(nil, logger)
  @connection_parameters = connection_parameters
  @config = config
  @debug = full_config[:debug] || false
  @full_config = full_config

  @prepared_statements = false
  if ActiveRecord::version == Gem::Version.new('6.0.0')
    @prepared_statement_status = Concurrent::ThreadLocalVar.new(false)
  end

  connect
end

Public Instance Methods

apply_cluster(sql) click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 331
def apply_cluster(sql)
  cluster ? "#{sql} ON CLUSTER #{cluster}" : sql
end
change_column(table_name, column_name, type, options = {}) click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 304
def change_column(table_name, column_name, type, options = {})
  result = do_execute "ALTER TABLE #{quote_table_name(table_name)} #{change_column_for_alter(table_name, column_name, type, options)}"
  raise "Error parse json response: #{result}" if result.presence && !result.is_a?(Hash)
end
change_column_default(table_name, column_name, default) click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 315
def change_column_default(table_name, column_name, default)
  change_column table_name, column_name, nil, {default: default}.compact
end
change_column_null(table_name, column_name, null, default = nil) click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 309
def change_column_null(table_name, column_name, null, default = nil)
  structure = table_structure(table_name).select{|v| v[0] == column_name.to_s}.first
  raise "Column #{column_name} not found in table #{table_name}" if structure.nil?
  change_column table_name, column_name, structure[1].gsub(/(Nullable\()?(.*?)\)?/, '\2'), {null: null, default: default}.compact
end
cluster() click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 319
def cluster
  @full_config[:cluster_name]
end
create_database(name) click to toggle source

Create a new ClickHouse database.

# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 254
def create_database(name)
  sql = apply_cluster "CREATE DATABASE #{quote_table_name(name)}"
  log_with_debug(sql, adapter_name) do
    res = @connection.post("/?#{@config.except(:database).to_param}", sql)
    process_response(res)
  end
end
create_table(table_name, **options) { |td| ... } click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 275
def create_table(table_name, **options)
  options = apply_replica(table_name, options)
  td = create_table_definition(apply_cluster(table_name), options)
  yield td if block_given?

  if options[:force]
    drop_table(table_name, options.merge(if_exists: true))
  end

  execute schema_creation.accept td
end
create_view(table_name, **options) { |td| ... } click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 262
def create_view(table_name, **options)
  options.merge!(view: true)
  options = apply_replica(table_name, options)
  td = create_table_definition(apply_cluster(table_name), options)
  yield td if block_given?

  if options[:force]
    drop_table(table_name, options.merge(if_exists: true))
  end

  execute schema_creation.accept td
end
migrations_paths() click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 148
def migrations_paths
  @full_config[:migrations_paths] || 'db/migrate_clickhouse'
end
quoted_date(value) click to toggle source

Quoting time without microseconds

# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 211
def quoted_date(value)
  if value.acts_like?(:time)
    zone_conversion_method = ActiveRecord::Base.default_timezone == :utc ? :getutc : :getlocal

    if value.respond_to?(zone_conversion_method)
      value = value.send(zone_conversion_method)
    end
  end

  value.to_s(:db)
end
rename_table(table_name, new_name) click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 296
def rename_table(table_name, new_name)
  do_execute apply_cluster "RENAME TABLE #{quote_table_name(table_name)} TO #{quote_table_name(new_name)}"
end
replica() click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 323
def replica
  @full_config[:replica_name]
end
replica_path(table) click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 327
def replica_path(table)
  "/clickhouse/tables/#{cluster}/#{@config[:database]}.#{table}"
end
show_create_table(table) click to toggle source

@param [String] table @return [String]

# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 249
def show_create_table(table)
  do_system_execute("SHOW CREATE TABLE `#{table}`")['data'].try(:first).try(:first).gsub(/[\n\s]+/m, ' ')
end
supports_insert_on_duplicate_skip?() click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 335
def supports_insert_on_duplicate_skip?
  true
end
supports_insert_on_duplicate_update?() click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 339
def supports_insert_on_duplicate_update?
  true
end
valid_type?(type) click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 164
def valid_type?(type)
  !native_database_types[type].nil?
end

Protected Instance Methods

change_column_for_alter(table_name, column_name, type, options = {}) click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 354
def change_column_for_alter(table_name, column_name, type, options = {})
  td = create_table_definition(table_name)
  cd = td.new_column_definition(column_name, type, options)
  schema_creation.accept(ChangeColumnDefinition.new(cd, column_name))
end
last_inserted_id(result) click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 350
def last_inserted_id(result)
  result
end

Private Instance Methods

apply_replica(table, options) click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 372
def apply_replica(table, options)
  if replica && cluster && options[:options]
    match = options[:options].match(/^(.*?MergeTree)\(([^\)]*)\)(.*?)$/)
    if match
      options[:options] = "Replicated#{match[1]}(#{([replica_path(table), replica].map{|v| "'#{v}'"} + [match[2].presence]).compact.join(', ')})#{match[3]}"
    end
  end
  options
end
connect() click to toggle source
# File lib/active_record/connection_adapters/clickhouse_adapter.rb, line 362
def connect
  @connection = @connection_parameters[:connection] || Net::HTTP.start(@connection_parameters[:host], @connection_parameters[:port], use_ssl: @connection_parameters[:ssl], verify_mode: OpenSSL::SSL::VERIFY_NONE)

  @connection.ca_file = @connection_parameters[:ca_file] if @connection_parameters[:ca_file]
  @connection.read_timeout = @connection_parameters[:read_timeout] if @connection_parameters[:read_timeout]
  @connection.write_timeout = @connection_parameters[:write_timeout] if @connection_parameters[:write_timeout]

  @connection
end