class DynamoDB::Streams::Client::CLI

Constants

ITOR_WAIT

Public Instance Methods

client() click to toggle source
# File lib/dynamodb/streams/client/cli.rb, line 91
def client
  return @client if @client

  client_options = {
    :access_key_id     => options.fetch('access-key', ENV['AWS_ACCESS_KEY_ID']),
    :secret_access_key => options.fetch('secret-key', ENV['AWS_SECRET_ACCESS_KEY']),
    :endpoint          => options.fetch('endpoint',   ENV['DYNAMODB_STREAMS_ENDPOINT']),
    :region            => options['region'],
  }

  @client = DynamoDB::Streams::Client.new(client_options)
  @client.debug = true if options['debug']
  @client
end
describe_stream(stream_id) click to toggle source
# File lib/dynamodb/streams/client/cli.rb, line 22
def describe_stream(stream_id)
  req_hash = {'StreamId' => stream_id}

  res_data = iterate('ShardId', req_hash) do |rh|
    client.query('DescribeStream', rh)
  end

  puts JSON.pretty_generate(res_data)
end
get_records(shard_iterator) click to toggle source
# File lib/dynamodb/streams/client/cli.rb, line 52
def get_records(shard_iterator)
  req_hash = {'ShardIterator' => shard_iterator}
  req_hash['Limit'] = options['limit'] if options['limit']

  loop do
    res_data = client.query('GetRecords', req_hash)
    puts JSON.pretty_generate(res_data)
    next_shard_iterator = res_data['NextShardIterator']

    unless options['follow'] and next_shard_iterator
      break
    end

    req_hash['ShardIterator'] = next_shard_iterator
  end
end
get_shard_iterator(stream_id, shard_id, shard_iterator_type) click to toggle source
# File lib/dynamodb/streams/client/cli.rb, line 34
def get_shard_iterator(stream_id, shard_id, shard_iterator_type)
  req_hash = {
    'StreamId'          => stream_id,
    'ShardId'           => shard_id,
    'ShardIteratorType' => shard_iterator_type.upcase,
  }

  if seq_num = options['sequence-number']
    req_hash['SequenceNumber'] = seq_num
  end

  res_data = client.query('GetShardIterator', req_hash)
  puts JSON.pretty_generate(res_data)
end
iterate(item, req_hash = {}) { |req_hash| ... } click to toggle source
# File lib/dynamodb/streams/client/cli.rb, line 70
def iterate(item, req_hash = {})
  res_data = {}

  list = proc do |last_evaluated|
    req_hash["ExclusiveStart#{item}"] = last_evaluated if last_evaluated
    resp = yield(req_hash)
    res_data.deep_merge!(resp)
    resp["LastEvaluated#{item}"]
  end

  le = nil

  loop do
    le = list.call(le)
    break unless le
  end

  res_data.delete("LastEvaluated#{item}")
  res_data
end
list_streams() click to toggle source
# File lib/dynamodb/streams/client/cli.rb, line 13
def list_streams
  res_data = iterate('StreamId') do |rh|
    client.query('ListStreams', rh)
  end

  puts JSON.pretty_generate(res_data)
end