class Fluent::Plugin::PostgresReplicatorInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_postgres_replicator.rb, line 19
def configure(conf)
  super

  @interval = Fluent::Config.time_value(@interval)
  @primary_keys = @primary_keys.split(/\s*,\s*/)
end
emit_record(tag, record) click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 105
def emit_record(tag, record)
  router.emit(tag, Fluent::Engine.now, record)
end
format_tag(tag, param) click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 97
def format_tag(tag, param)
  pattern = {'${event}' => param[:event].to_s, '${primary_keys}' => @primary_keys.join('_')}
  tag.gsub(/(\${[a-z_]+})/) do
    log.warn "placeholder value is not found. :tag=>#{tag} :placeholder=>#{$1}" unless pattern.include?($1)
    pattern[$1]
  end
end
get_connection() click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 109
def get_connection
  begin
    return PG::Connection.new({
      :host => @host,
      :port => @port,
      :user => @username,
      :password => @password,
      :dbname => @database
    })
  rescue Exception => e
    log.warn "failed to get connection and will retry. error: #{e}"
    sleep @interval
    retry
  end
end
poll() click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 47
def poll
  hash_values = Hash.new
  conn = get_connection()
  loop do
    rows_count = 0
    start_time = Time.now
    rows, conn = query(@sql, conn)
    rows.each do |row|
      row_ids = Array.new
      @primary_keys.each do |primary_key|
        if !row[primary_key].nil?
          row_ids << row[primary_key]
        end
      end
      if row_ids.size != @primary_keys.size
        log.error "primary_keys column value is something wrong. :tag=>#{@tag} :primary_keys=>#{@primary_keys}"
        break
      end

      hash_value_id = row_ids.join('_')
      hash_value = Digest::SHA1.hexdigest(row.flatten.join)
      if !hash_values.include?(hash_value_id)
        tag = format_tag(@tag, {:event => :insert})
        emit_record(tag, row)
      elsif hash_values[hash_value_id] != hash_value
        tag = format_tag(@tag, {:event => :update})
        emit_record(tag, row)
      end
      hash_values[hash_value_id] = hash_value
      rows_count += 1
    end
    conn.close
    elapsed_time = sprintf('%0.02f', Time.now - start_time)
    log.info "success to execute replicator. :tag=>#{@tag} :rows_count=>#{rows_count} :elapsed_time=>#{elapsed_time} sec"
    sleep @interval
  end

end
query(sql, conn = nil) click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 86
def query(sql, conn = nil)
  begin
    conn = (conn.nil? || conn.finished?) ? get_connection : conn
    return conn.query(sql), conn
  rescue Exception => e
    log.warn "failed to execute query and will retry. error: #{e}"
    sleep @interval
    retry
  end
end
run() click to toggle source
# File lib/fluent/plugin/in_postgres_replicator.rb, line 38
def run
  begin
    poll
  rescue StandardError => e
    log.error "failed to execute query. error: #{e.message}"
    log.error e.backtrace.join("\n")
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_postgres_replicator.rb, line 32
def shutdown
  Thread.kill(@thread)

  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_postgres_replicator.rb, line 26
def start
  super

  @thread = Thread.new(&method(:run))
end