class RedTrack::KinesisClient

Constants

DEFAULT_MAX_RECORDS
DEFAULT_MAX_REQUESTS
TAG

Public Class Methods

new(options) click to toggle source

Setup instance variables for kinesis access

@param [Hash] options Expects :redshift_cluster_name, :redshift_dbname. Optionally :verbose @return [Boolean] Success

# File lib/redtrack_kinesisclient.rb, line 20
def initialize(options)
  @verbose = options[:verbose] || false
  @logger = options[:logger]
  if @logger == nil
    @logger = Logger.new(STDOUT)
  end
  @options=options
end

Public Instance Methods

create_kinesis_stream_for_table(table,shard_count=1) click to toggle source

Create a kinesis stream for the redshift table

@param [String] table The name of the table @param [integer] shard_count The number of shards in the stream

# File lib/redtrack_kinesisclient.rb, line 80
def create_kinesis_stream_for_table(table,shard_count=1)
  options = {
      :stream_name => stream_name(table),
      :shard_count => shard_count
  }
  result = AWS.kinesis.client.create_stream(options)
  return result
end
get_shard_description(stream_name,stream_shard_index) click to toggle source

Get hash describing the shard from describe_stream

@param [String] stream_name The name of the kinesis stream @param [Integer] stream_shard_index The index of the shard in the array of shards @return [Hash] Information regarding the stream shard, from AWS kinesis

# File lib/redtrack_kinesisclient.rb, line 60
def get_shard_description(stream_name,stream_shard_index)
  describe_response = AWS.kinesis.client.describe_stream({:stream_name => stream_name})

  if describe_response != nil && describe_response[:stream_description] != nil
    result = describe_response[:stream_description][:shards][stream_shard_index]
    result[:success] = true
    result[:stream_description] = describe_response[:stream_description]
  else
    result = {
        success: false,
        describe_response: describe_response
    }
  end
  return result
end
get_shard_descriptions(stream_name) click to toggle source

Get hash describing the shard from describe_stream

@param [String] stream_name The name of the kinesis stream @return [Hash] Information regarding the stream shards

# File lib/redtrack_kinesisclient.rb, line 45
def get_shard_descriptions(stream_name)
  describe_response = AWS.kinesis.client.describe_stream({:stream_name => stream_name})

  result = nil
  if describe_response != nil && describe_response[:stream_description] != nil
    result = describe_response[:stream_description][:shards]
  end
  return result
end
get_shard_iterator_from_sequence_number(stream_name,shard_description,starting_sequence_number=nil) click to toggle source

Get the shard iterator given a checkpointed sequence number. If no checkpoint, start to read from start of shard

@param [String] stream_name The name of the stream to get a shard iterator for @param [Hash] shard_description Result from describe stream request @param [String] starting_sequence_number The sequence number to get a shard iterator for, if doesn’t exist, get one for start of shard @return [String] The shard iterator

# File lib/redtrack_kinesisclient.rb, line 95
def get_shard_iterator_from_sequence_number(stream_name,shard_description,starting_sequence_number=nil)

  ## Get shard iterator
  get_shard_iterator_options = {
      :stream_name => stream_name,
      :shard_id => shard_description[:shard_id]
  }

  ## Options based on starting sequence number
  if starting_sequence_number != nil
    get_shard_iterator_options[:shard_iterator_type] = 'AFTER_SEQUENCE_NUMBER'
    get_shard_iterator_options[:starting_sequence_number] = starting_sequence_number
  else
    @logger.warn("Shard '#{shard_description[:shard_id]}' has no starting sequence number, use TRIM_HORIZON shard iterator")
    get_shard_iterator_options[:shard_iterator_type] = 'TRIM_HORIZON'
  end

  get_shard_iterator_response = AWS.kinesis.client.get_shard_iterator(get_shard_iterator_options)
  shard_iterator = get_shard_iterator_response[:shard_iterator]
  return shard_iterator
end
stream_name(redshift_table) click to toggle source

Name of the stream in the data broker (This is a Kinesis stream name)

@param [String] redshift_table Name of the redshift table @return [String] Name of the stream in Kinesis

# File lib/redtrack_kinesisclient.rb, line 33
def stream_name(redshift_table)
  if @options[:redshift_cluster_name] == nil || @options[:redshift_dbname] == nil
    raise 'Need to specify :redshift_cluster_name and :redshift_dbname as options'
  end
  result= @options[:redshift_cluster_name] + '.' + @options[:redshift_dbname] + ".#{redshift_table}"
  return result
end
stream_read_from_shard_iterator_into_files(shard_iterator, files, options={}) click to toggle source

Read from kinesis shard into a file

@param [String] shard_iterator The shard iterator to start reading from - result of get_shard_iterator @param [String] files Array of files to write data into @param [Hash] options Optional. Can specify :max_records, :max_requests @return [Hash] Hash of # of records read and the sequence number of the last read record, number of records, and shard iterator

# File lib/redtrack_kinesisclient.rb, line 123
def stream_read_from_shard_iterator_into_files(shard_iterator, files, options={})

  max_records = options[:max_records] || DEFAULT_MAX_RECORDS
  max_requests = options[:max_requests] || DEFAULT_MAX_REQUESTS

  start_sequence_number=nil
  end_sequence_number=nil
  records = 0
  num_files = files.length

  for i in 0..max_requests

    # Execute get_records against AWS Kinesis
    get_records_response = AWS.kinesis.client.get_records({:shard_iterator => shard_iterator})

    # Process records
    if get_records_response != nil && get_records_response.data != nil && get_records_response.data[:records] != nil && get_records_response.data[:records].count > 0
      get_records_response.data[:records].each do |record|

        data_payload = JSON.parse(record[:data])
        data = data_payload['data']

        # rotate which file we write into
        files[records % num_files].puts data + "\n"

        #  Seqeunce numbers
        if (start_sequence_number == nil)
          start_sequence_number = record[:sequence_number].to_i
        end
        if (end_sequence_number == nil || record[:sequence_number].to_i > end_sequence_number)
          end_sequence_number = record[:sequence_number].to_i
        else
          @logger.warn("#{TAG} Out of order sequence number: #{end_sequence_number.to_s}")
        end

        # Increment records read; check exit condition
        records+=1
        if (records >= max_records)
          break
        end
      end
    end

    # set shard iterator for next request from payload
    shard_iterator=get_records_response.data[:next_shard_iterator]

    # Check exit conditions
    if(shard_iterator == nil || records >= max_records)
      break
    end
  end

  result = {
    starting_sequence_number: start_sequence_number.to_s,
    ending_sequence_number: end_sequence_number.to_s,
    next_shard_iterator: shard_iterator,
    records: records
  }
  return result
end
stream_write(stream_name,data_string,partition_key=nil) click to toggle source

Write data to a stream. This expects the data to be a serialized string

@param [String] stream_name The name of the stream @param [String] data_string String of data to write @param [String] partition_key How to keep the data partitioned in kinesis. See docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Kinesis-PutRecord-request-PartitionKey @return [Boolean] True - the write to the stream succeeded

# File lib/redtrack_kinesisclient.rb, line 190
def stream_write(stream_name,data_string,partition_key=nil)
  result=false

  partition_key = partition_key || rand(100).to_s

  put_data = {
    :data => data_string
  }

  put_options = {
    :stream_name => stream_name,
    :partition_key => partition_key,
    :data => put_data.to_json
  }

  @logger.debug("#{TAG} write to #{stream_name} stream with data #{data_string}")

  # Write to kinesis; 3 attempts
  attempt_count=3
  last_exception=nil
  while attempt_count > 0 && !result
    begin
      put_record_result = AWS.kinesis.client.put_record(put_options)
      puts put_record_result.to_json
      @logger.warn("put record result #{put_record_result.to_json}")
      if put_record_result.http_response.status < 299
        result = true
      else
        @logger.warn("#{TAG} put_record response: HTTP #{put_record_result.http_response.status}: #{put_record_result.http_response.body}")
      end
    rescue Exception => e

      # log exception and retry with 1 second backoff
      @logger.warn("#{TAG} put_record Exception caught #{e.class}: #{e.message}\n\t#{e.backtrace.join("\n\t")}")
      attempt_count-=1
      last_exception=e
    end
  end

  # If failure after 3 retries, raise the last exception
  if !result
    raise last_exception
  end

  return result
end