class Fluent::Plugin::PostgresReplicatorInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_postgres_replicator.rb, line 19 def configure(conf) super @interval = Fluent::Config.time_value(@interval) @primary_keys = @primary_keys.split(/\s*,\s*/) end
emit_record(tag, record)
click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 105 def emit_record(tag, record) router.emit(tag, Fluent::Engine.now, record) end
format_tag(tag, param)
click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 97 def format_tag(tag, param) pattern = {'${event}' => param[:event].to_s, '${primary_keys}' => @primary_keys.join('_')} tag.gsub(/(\${[a-z_]+})/) do log.warn "placeholder value is not found. :tag=>#{tag} :placeholder=>#{$1}" unless pattern.include?($1) pattern[$1] end end
get_connection()
click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 109 def get_connection begin return PG::Connection.new({ :host => @host, :port => @port, :user => @username, :password => @password, :dbname => @database }) rescue Exception => e log.warn "failed to get connection and will retry. error: #{e}" sleep @interval retry end end
poll()
click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 47 def poll hash_values = Hash.new conn = get_connection() loop do rows_count = 0 start_time = Time.now rows, conn = query(@sql, conn) rows.each do |row| row_ids = Array.new @primary_keys.each do |primary_key| if !row[primary_key].nil? row_ids << row[primary_key] end end if row_ids.size != @primary_keys.size log.error "primary_keys column value is something wrong. :tag=>#{@tag} :primary_keys=>#{@primary_keys}" break end hash_value_id = row_ids.join('_') hash_value = Digest::SHA1.hexdigest(row.flatten.join) if !hash_values.include?(hash_value_id) tag = format_tag(@tag, {:event => :insert}) emit_record(tag, row) elsif hash_values[hash_value_id] != hash_value tag = format_tag(@tag, {:event => :update}) emit_record(tag, row) end hash_values[hash_value_id] = hash_value rows_count += 1 end conn.close elapsed_time = sprintf('%0.02f', Time.now - start_time) log.info "success to execute replicator. :tag=>#{@tag} :rows_count=>#{rows_count} :elapsed_time=>#{elapsed_time} sec" sleep @interval end end
query(sql, conn = nil)
click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 86 def query(sql, conn = nil) begin conn = (conn.nil? || conn.finished?) ? get_connection : conn return conn.query(sql), conn rescue Exception => e log.warn "failed to execute query and will retry. error: #{e}" sleep @interval retry end end
run()
click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 38 def run begin poll rescue StandardError => e log.error "failed to execute query. error: #{e.message}" log.error e.backtrace.join("\n") end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_postgres_replicator.rb, line 32 def shutdown Thread.kill(@thread) super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_postgres_replicator.rb, line 26 def start super @thread = Thread.new(&method(:run)) end