class RedisCounters::Dumpers::Engine

Класс дампер (движек дампера) - класс осуществляющий перенос данных счетчика в БД.

Может использоваться как напрямую так и с помощью DSL (см. модуль RedisCounters::Dumpers::Dsl::Engine).

Общий алгоритм работы:

- копируем данные счетчика в временную таблицу
- мерджим данные во все целевые таблицы
- удаляем перенесенные данные из счетчика

Все destinations должны быть в рамках одной БД. Все действия происходят в рамках соединения БД, первой destination.

Example:

dumper = Dumper.build do
  name :hits_by_day

  fields => {
    :company_id => :integer,
    :value => :integer,
    :date => :date,
    :start_month_date => :date,
  }

  destination do
    model            CompanyStatisticTotalByDay
    take             :company_id, :pages, :date
    key_fields       :company_id, :date
    increment_fields :pages
    map              :pages, :to => :value
    condition        'target.date = :date'
  end

  destination do
    model            CompanyStatisticTotalByMonth
    take             :company_id, :pages, :date
    key_fields       :company_id, :date
    increment_fields :pages
    map              :pages, :to => :value
    map              :date,  :to => :start_month_date
    condition        'target.date = :start_month_date'
  end

  on_before_merge do |dumper, connection|
    dumper.common_params = {
      :date             => dumper.date.strftime('%Y-%m-%d'),
      :start_month_date => dumper.date.beginning_of_month.strftime('%Y-%m-%d'),
    }
  end
end

dumper.process!(counter, date: Date.yesterday)

В результате все данные счетчика за вчера, будут смерджены в целевые таблицы, по ключевым полям: company_id и date, причем все поля кроме pages, будут просто записаны в таблицы, а поле pages будет инкрементировано с текущим значением, при обновлении. Данные будут удалены из счетчика. Все действия производятся транзакционно, как в БД, так и в Redis.

Constants

DATE_FORMAT

Attributes

args[R]
common_params[RW]

Хеш общий параметров. Данные хеш мерджится в каждую, поступающую от счетчика, строку данных.

counter[R]
destinations[RW]

Массив, целевых моделей для сохранение данных, Array. Каждый элемент массива это экземпляр класса Engine::Destination.

fields[R]

Список доступных для сохранение в целевые таблицы полей и их типов данных, в виде Hash. Доступны следующие типы данных: string, integer, date, timestamp, boolean. Преобразование типов производится непосредственно перед мерджем в целевые таблицы.

Example:

fields = {:company_id => :integer, :date => :timestamp}
name[R]

Название дампера

rows_processed[RW]
temp_table_name[RW]

Название temp таблицы, используемой для переноса данных. По умолчанию: “tmp_#{dumper_name}”

Public Class Methods

new() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 148
def initialize
  @destinations = []
  @common_params  = {}
end

Public Instance Methods

fields=(value) click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 153
def fields=(value)
  @fields = value.with_indifferent_access
end
name=(value) click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 157
def name=(value)
  @name = value
  @temp_table_name = "tmp_#{@name}"
end
process!(counter, args = {}) click to toggle source

Public: Производит перенос данных счетчика.

counter - экземпляр счетчика. args - Hash - набор аргументов(кластер и/или партиции) для переноса данных.

Returns Fixnum - кол-во обработанных строк.

# File lib/redis_counters/dumpers/engine.rb, line 133
def process!(counter, args = {})
  @counter = counter
  @args = args

  db_transaction do
    merge_data
    start_redis_transaction
    delete_from_redis
  end

  commit_redis_transaction

  rows_processed
end

Protected Instance Methods

analyze_table() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 261
      def analyze_table
        db_connection.execute <<-SQL
          ANALYZE #{temp_table_name}
        SQL
      end
batch_data() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 215
def batch_data
  @current_batch.map! do |row|
    values = row.map do |field, value|
      next 'null' if value.nil?
      fields.fetch(field).eql?(:integer) ? value : quote(value)
    end

    "(#{values.join(',')})"
  end.join(',')
end
columns_definition() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 267
def columns_definition
  @fields.map do |field, type|
    pg_field_type = case type
                    when :string, :text
                      'character varying(4000)'
                    when :integer, :serial, :number
                      'integer'
                    when :date, :timestamp, :boolean, :hstore
                      type.to_s
                    else
                      if type.is_a?(Array) && type.first == :enum
                        type.last.fetch(:name)
                      else
                        raise 'Unknown datatype %s for %s field' % [type, field]
                      end
                    end

    "#{field} #{pg_field_type}"
  end.join(',')
end
create_temp_table() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 249
      def create_temp_table
        db_connection.execute <<-SQL
          CREATE TEMP TABLE #{temp_table_name} (
            #{columns_definition}
          ) ON COMMIT DROP
        SQL
      end
db_connection() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 288
def db_connection
  destinations.first.connection
end
delete_from_redis() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 226
def delete_from_redis
  redis_session.pipelined do |redis|
    counter.partitions(args).each do |partition|
      counter.delete_partition_direct!(args.merge(partition), redis)
    end
  end

  fire_callback(:on_after_delete, self, redis_session)
end
drop_temp_table() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 257
def drop_temp_table
  db_connection.execute "DROP TABLE #{temp_table_name}"
end
fill_temp_table() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 187
def fill_temp_table
  @rows_processed = counter.data(args) do |batch|
    @current_batch = batch
    prepare_batch
    insert_batch
  end
end
insert_batch() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 209
      def insert_batch
        db_connection.execute <<-SQL
          INSERT INTO #{temp_table_name} VALUES #{batch_data}
        SQL
      end
merge_data() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 171
def merge_data
  fire_callback(:on_before_merge, self, db_connection)

  # копируем данные счетчика в временную таблицу
  create_temp_table
  fill_temp_table
  analyze_table

  # мерджим в целевые таблицы
  destinations.each { |dest| dest.merge }

  fire_callback(:on_after_merge, self, db_connection)

  drop_temp_table
end
prepare_batch() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 195
def prepare_batch
  fields_keys = fields.keys

  @current_batch.map! do |row|
    row.merge!(common_params)
    fire_callback(:on_prepare_row, self, row)

    # выбираем из хеша только указанные поля
    fields_keys.inject(HashWithIndifferentAccess.new) do |result, (field)|
      result.merge!(field => row.fetch(field))
    end
  end
end
redis_session() click to toggle source
# File lib/redis_counters/dumpers/engine.rb, line 236
def redis_session
  return @redis_session if defined?(@redis_session)

  client = if Gem::Version.new(::Redis::VERSION) < Gem::Version.new('4')
             counter.redis.client
           else
             counter.redis._client
           end
  redis = ::Redis.new(client.options)

  @redis_session = ::Redis::Namespace.new(counter.redis.namespace, redis: redis)
end