class RedisCounters::Dumpers::Engine
Класс дампер (движек дампера) - класс осуществляющий перенос данных счетчика в БД.
Может использоваться как напрямую так и с помощью DSL (см. модуль RedisCounters::Dumpers::Dsl::Engine
).
Общий алгоритм работы:
- копируем данные счетчика в временную таблицу - мерджим данные во все целевые таблицы - удаляем перенесенные данные из счетчика
Все destinations должны быть в рамках одной БД. Все действия происходят в рамках соединения БД, первой destination.
Example:
dumper = Dumper.build do name :hits_by_day fields => { :company_id => :integer, :value => :integer, :date => :date, :start_month_date => :date, } destination do model CompanyStatisticTotalByDay take :company_id, :pages, :date key_fields :company_id, :date increment_fields :pages map :pages, :to => :value condition 'target.date = :date' end destination do model CompanyStatisticTotalByMonth take :company_id, :pages, :date key_fields :company_id, :date increment_fields :pages map :pages, :to => :value map :date, :to => :start_month_date condition 'target.date = :start_month_date' end on_before_merge do |dumper, connection| dumper.common_params = { :date => dumper.date.strftime('%Y-%m-%d'), :start_month_date => dumper.date.beginning_of_month.strftime('%Y-%m-%d'), } end end dumper.process!(counter, date: Date.yesterday)
В результате все данные счетчика за вчера, будут смерджены в целевые таблицы, по ключевым полям: company_id и date, причем все поля кроме pages, будут просто записаны в таблицы, а поле pages будет инкрементировано с текущим значением, при обновлении. Данные будут удалены из счетчика. Все действия производятся транзакционно, как в БД, так и в Redis.
Constants
- DATE_FORMAT
Attributes
Хеш общий параметров. Данные хеш мерджится в каждую, поступающую от счетчика, строку данных.
Массив, целевых моделей для сохранение данных, Array. Каждый элемент массива это экземпляр класса Engine::Destination.
Список доступных для сохранение в целевые таблицы полей и их типов данных, в виде Hash. Доступны следующие типы данных: string, integer, date, timestamp, boolean. Преобразование типов производится непосредственно перед мерджем в целевые таблицы.
Example:
fields = {:company_id => :integer, :date => :timestamp}
Название дампера
Название temp таблицы, используемой для переноса данных. По умолчанию: “tmp_#{dumper_name}”
Public Class Methods
# File lib/redis_counters/dumpers/engine.rb, line 148 def initialize @destinations = [] @common_params = {} end
Public Instance Methods
# File lib/redis_counters/dumpers/engine.rb, line 153 def fields=(value) @fields = value.with_indifferent_access end
# File lib/redis_counters/dumpers/engine.rb, line 157 def name=(value) @name = value @temp_table_name = "tmp_#{@name}" end
Public: Производит перенос данных счетчика.
counter - экземпляр счетчика. args - Hash - набор аргументов(кластер и/или партиции) для переноса данных.
Returns Fixnum - кол-во обработанных строк.
# File lib/redis_counters/dumpers/engine.rb, line 133 def process!(counter, args = {}) @counter = counter @args = args db_transaction do merge_data start_redis_transaction delete_from_redis end commit_redis_transaction rows_processed end
Protected Instance Methods
# File lib/redis_counters/dumpers/engine.rb, line 261 def analyze_table db_connection.execute <<-SQL ANALYZE #{temp_table_name} SQL end
# File lib/redis_counters/dumpers/engine.rb, line 215 def batch_data @current_batch.map! do |row| values = row.map do |field, value| next 'null' if value.nil? fields.fetch(field).eql?(:integer) ? value : quote(value) end "(#{values.join(',')})" end.join(',') end
# File lib/redis_counters/dumpers/engine.rb, line 267 def columns_definition @fields.map do |field, type| pg_field_type = case type when :string, :text 'character varying(4000)' when :integer, :serial, :number 'integer' when :date, :timestamp, :boolean, :hstore type.to_s else if type.is_a?(Array) && type.first == :enum type.last.fetch(:name) else raise 'Unknown datatype %s for %s field' % [type, field] end end "#{field} #{pg_field_type}" end.join(',') end
# File lib/redis_counters/dumpers/engine.rb, line 249 def create_temp_table db_connection.execute <<-SQL CREATE TEMP TABLE #{temp_table_name} ( #{columns_definition} ) ON COMMIT DROP SQL end
# File lib/redis_counters/dumpers/engine.rb, line 288 def db_connection destinations.first.connection end
# File lib/redis_counters/dumpers/engine.rb, line 226 def delete_from_redis redis_session.pipelined do |redis| counter.partitions(args).each do |partition| counter.delete_partition_direct!(args.merge(partition), redis) end end fire_callback(:on_after_delete, self, redis_session) end
# File lib/redis_counters/dumpers/engine.rb, line 257 def drop_temp_table db_connection.execute "DROP TABLE #{temp_table_name}" end
# File lib/redis_counters/dumpers/engine.rb, line 187 def fill_temp_table @rows_processed = counter.data(args) do |batch| @current_batch = batch prepare_batch insert_batch end end
# File lib/redis_counters/dumpers/engine.rb, line 209 def insert_batch db_connection.execute <<-SQL INSERT INTO #{temp_table_name} VALUES #{batch_data} SQL end
# File lib/redis_counters/dumpers/engine.rb, line 171 def merge_data fire_callback(:on_before_merge, self, db_connection) # копируем данные счетчика в временную таблицу create_temp_table fill_temp_table analyze_table # мерджим в целевые таблицы destinations.each { |dest| dest.merge } fire_callback(:on_after_merge, self, db_connection) drop_temp_table end
# File lib/redis_counters/dumpers/engine.rb, line 195 def prepare_batch fields_keys = fields.keys @current_batch.map! do |row| row.merge!(common_params) fire_callback(:on_prepare_row, self, row) # выбираем из хеша только указанные поля fields_keys.inject(HashWithIndifferentAccess.new) do |result, (field)| result.merge!(field => row.fetch(field)) end end end
# File lib/redis_counters/dumpers/engine.rb, line 236 def redis_session return @redis_session if defined?(@redis_session) client = if Gem::Version.new(::Redis::VERSION) < Gem::Version.new('4') counter.redis.client else counter.redis._client end redis = ::Redis.new(client.options) @redis_session = ::Redis::Namespace.new(counter.redis.namespace, redis: redis) end