class Fluent::DynamoDBStreamsInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 10
def initialize
  super
  require 'aws-sdk'
  require 'bigdecimal'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 25
def configure(conf)
  super

  if @aws_region == "ddblocal"
    @aws_region = "ap-northeast-1" # dummy settings
    @stream_endpoint = "http://localhost:8000"
  else
    @stream_endpoint = "https://streams.dynamodb.#{@aws_region}.amazonaws.com"
  end

  unless @pos_file
    log.warn "dynamodb-streams: 'pos_file PATH' parameter is not set to a 'dynamodb-streams' source."
    log.warn "dynamodb-streams: this parameter is highly recommended to save the position to resume."
  end
end
dynamodb_to_hash(hash) click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 187
def dynamodb_to_hash(hash)
  hash.each do |k, v|
    # delete binary attributes
    if v.b || v.bs
      hash.delete(k)
    else
      hash[k] = format_attribute_value(v)
    end
  end
  return hash
end
emit(r) click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 168
def emit(r)
  record = {
    "aws_region" => r.aws_region,
    "event_source" => r.event_source,
    "event_version" => r.event_version,
    "event_id" => r.event_id,
    "event_name" => r.event_name,
    "dynamodb" => {
      "stream_view_type" => r.dynamodb.stream_view_type,
      "sequence_number" => r.dynamodb.sequence_number,
      "size_bytes" => r.dynamodb.size_bytes,
    }
  }
  record["dynamodb"]["keys"] = dynamodb_to_hash(r.dynamodb.keys) if r.dynamodb.keys
  record["dynamodb"]["old_image"] = dynamodb_to_hash(r.dynamodb.old_image) if r.dynamodb.old_image
  record["dynamodb"]["new_image"] = dynamodb_to_hash(r.dynamodb.new_image) if r.dynamodb.new_image
  router.emit(@tag, Time.now.to_i, record)
end
format_attribute_value(v) click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 199
def format_attribute_value(v)
  if v.m
    return dynamodb_to_hash(v.m)
  elsif v.l
    return v.l.map {|i| format_attribute_value(i) }
  elsif v.ns
    return v.ns.map {|i| BigDecimal.new(i).to_i }
  elsif v.ss
    return v.ss
  elsif v.null
    return null
  elsif v.bool
    return v.bool
  elsif v.n
    return BigDecimal.new(v.n).to_i
  elsif v.s
    return v.s
  else
    log.warn "dynamodb-streams: unknown attribute value."
  end
end
get_shards() click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 100
def get_shards()
  shards = []

  last_shard_id = nil
  begin
    s = @client.describe_stream({
      stream_arn: @stream_arn,
      exclusive_start_shard_id: last_shard_id,
    }).stream_description

    shards = shards + s.shards

    if s.last_evaluated_shard_id == last_shard_id then
      break
    end
    last_shard_id = s.last_evaluated_shard_id
  end while last_shard_id

  shards
end
load_sequence(shard_id) click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 138
def load_sequence(shard_id)
  if @pos_file
    return nil unless File.exist?("#{@pos_file}.#{shard_id}")
    File.read("#{@pos_file}.#{shard_id}").chomp
  else
    return nil unless @pos_memory[shard_id]
    @pos_memory[shard_id]
  end
end
remove_sequence(shard_id) click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 159
def remove_sequence(shard_id)
  if @pos_file
    return unless File.exist?("#{@pos_file}.#{shard_id}")
    File.unlink("#{@pos_file}.#{shard_id}")
  else
    @pos_memory[shard_id] = nil
  end
end
run() click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 65
def run
  while @running
    sleep @fetch_interval

    get_shards.each do |s|
      if s.sequence_number_range.ending_sequence_number
        remove_sequence(s.shard_id)
        next
      end

      set_iterator(s.shard_id) unless @iterator.key? s.shard_id

      resp = @client.get_records({
        shard_iterator: @iterator[s.shard_id],
        limit: @fetch_size,
      })

      resp.records.each do |r|
        begin
          emit(r)
        rescue => e
          log.error "dynamodb-streams: error has occoured.", error: e.message, error_class: e.class
        end
        save_sequence(s.shard_id, r.dynamodb.sequence_number)
      end

      if resp.next_shard_iterator
        @iterator[s.shard_id] = resp.next_shard_iterator
      else
        @iterator.delete s.shard_id
      end
    end
  end
end
save_sequence(shard_id, sequence) click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 148
def save_sequence(shard_id, sequence)
  if @pos_file
    open("#{@pos_file}.#{shard_id}", 'w') do |f|
      f.write sequence
    end
  else
    @pos_memory[shard_id] = sequence
  end
  sequence
end
set_iterator(shard_id) click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 121
def set_iterator(shard_id)
  if load_sequence(shard_id)
    @iterator[shard_id] = @client.get_shard_iterator({
      stream_arn: @stream_arn,
      shard_id: shard_id,
      shard_iterator_type: "AFTER_SEQUENCE_NUMBER",
      sequence_number: load_sequence(shard_id),
    }).shard_iterator
  else
    @iterator[shard_id] = @client.get_shard_iterator({
      stream_arn: @stream_arn,
      shard_id: shard_id,
      shard_iterator_type: "TRIM_HORIZON",
    }).shard_iterator
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 60
def shutdown
  @running = false
  @thread.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 41
def start
  super

  unless @pos_file
    @pos_memory = {}
  end

  options = {}
  options[:region] = @aws_region if @aws_region
  options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key
  options[:endpoint] = @stream_endpoint
  @client = Aws::DynamoDBStreams::Client.new(options)

  @iterator = {}

  @running = true
  @thread = Thread.new(&method(:run))
end