class Fluent::DynamoDBStreamsInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 10 def initialize super require 'aws-sdk' require 'bigdecimal' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 25 def configure(conf) super if @aws_region == "ddblocal" @aws_region = "ap-northeast-1" # dummy settings @stream_endpoint = "http://localhost:8000" else @stream_endpoint = "https://streams.dynamodb.#{@aws_region}.amazonaws.com" end unless @pos_file log.warn "dynamodb-streams: 'pos_file PATH' parameter is not set to a 'dynamodb-streams' source." log.warn "dynamodb-streams: this parameter is highly recommended to save the position to resume." end end
dynamodb_to_hash(hash)
click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 187 def dynamodb_to_hash(hash) hash.each do |k, v| # delete binary attributes if v.b || v.bs hash.delete(k) else hash[k] = format_attribute_value(v) end end return hash end
emit(r)
click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 168 def emit(r) record = { "aws_region" => r.aws_region, "event_source" => r.event_source, "event_version" => r.event_version, "event_id" => r.event_id, "event_name" => r.event_name, "dynamodb" => { "stream_view_type" => r.dynamodb.stream_view_type, "sequence_number" => r.dynamodb.sequence_number, "size_bytes" => r.dynamodb.size_bytes, } } record["dynamodb"]["keys"] = dynamodb_to_hash(r.dynamodb.keys) if r.dynamodb.keys record["dynamodb"]["old_image"] = dynamodb_to_hash(r.dynamodb.old_image) if r.dynamodb.old_image record["dynamodb"]["new_image"] = dynamodb_to_hash(r.dynamodb.new_image) if r.dynamodb.new_image router.emit(@tag, Time.now.to_i, record) end
format_attribute_value(v)
click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 199 def format_attribute_value(v) if v.m return dynamodb_to_hash(v.m) elsif v.l return v.l.map {|i| format_attribute_value(i) } elsif v.ns return v.ns.map {|i| BigDecimal.new(i).to_i } elsif v.ss return v.ss elsif v.null return null elsif v.bool return v.bool elsif v.n return BigDecimal.new(v.n).to_i elsif v.s return v.s else log.warn "dynamodb-streams: unknown attribute value." end end
get_shards()
click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 100 def get_shards() shards = [] last_shard_id = nil begin s = @client.describe_stream({ stream_arn: @stream_arn, exclusive_start_shard_id: last_shard_id, }).stream_description shards = shards + s.shards if s.last_evaluated_shard_id == last_shard_id then break end last_shard_id = s.last_evaluated_shard_id end while last_shard_id shards end
load_sequence(shard_id)
click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 138 def load_sequence(shard_id) if @pos_file return nil unless File.exist?("#{@pos_file}.#{shard_id}") File.read("#{@pos_file}.#{shard_id}").chomp else return nil unless @pos_memory[shard_id] @pos_memory[shard_id] end end
remove_sequence(shard_id)
click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 159 def remove_sequence(shard_id) if @pos_file return unless File.exist?("#{@pos_file}.#{shard_id}") File.unlink("#{@pos_file}.#{shard_id}") else @pos_memory[shard_id] = nil end end
run()
click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 65 def run while @running sleep @fetch_interval get_shards.each do |s| if s.sequence_number_range.ending_sequence_number remove_sequence(s.shard_id) next end set_iterator(s.shard_id) unless @iterator.key? s.shard_id resp = @client.get_records({ shard_iterator: @iterator[s.shard_id], limit: @fetch_size, }) resp.records.each do |r| begin emit(r) rescue => e log.error "dynamodb-streams: error has occoured.", error: e.message, error_class: e.class end save_sequence(s.shard_id, r.dynamodb.sequence_number) end if resp.next_shard_iterator @iterator[s.shard_id] = resp.next_shard_iterator else @iterator.delete s.shard_id end end end end
save_sequence(shard_id, sequence)
click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 148 def save_sequence(shard_id, sequence) if @pos_file open("#{@pos_file}.#{shard_id}", 'w') do |f| f.write sequence end else @pos_memory[shard_id] = sequence end sequence end
set_iterator(shard_id)
click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 121 def set_iterator(shard_id) if load_sequence(shard_id) @iterator[shard_id] = @client.get_shard_iterator({ stream_arn: @stream_arn, shard_id: shard_id, shard_iterator_type: "AFTER_SEQUENCE_NUMBER", sequence_number: load_sequence(shard_id), }).shard_iterator else @iterator[shard_id] = @client.get_shard_iterator({ stream_arn: @stream_arn, shard_id: shard_id, shard_iterator_type: "TRIM_HORIZON", }).shard_iterator end end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 60 def shutdown @running = false @thread.join end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_dynamodb_streams.rb, line 41 def start super unless @pos_file @pos_memory = {} end options = {} options[:region] = @aws_region if @aws_region options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key options[:endpoint] = @stream_endpoint @client = Aws::DynamoDBStreams::Client.new(options) @iterator = {} @running = true @thread = Thread.new(&method(:run)) end