class RedTrack::FileClient

Public Class Methods

new(options) click to toggle source

Setup class variables for kinesis access

@param [Hash] options Nothing expected @return [Boolean] Success

# File lib/redtrack_local_file_stream.rb, line 15
def initialize(options)

  #check if log/ exists and create it if it doesn't
  if File.directory?("log") == false
    Dir.mkdir "log"
  end

  @options = options
end

Public Instance Methods

get_shard_descriptions(stream_name) click to toggle source

Fake shard description for file, use hostname for shard_name

@param [String] stream_name The name of the kinesis stream

# File lib/redtrack_local_file_stream.rb, line 61
def get_shard_descriptions(stream_name)
  return [{
      :shard_id => `hostname`.tr("\n","")
  }]
end
get_shard_iterator_from_sequence_number(stream_name,shard_description,starting_sequence_number=nil) click to toggle source

Get the shard iterator given a checkpointed sequence number. If no checkpoint, start to read from start of shard

@param [String] stream_name The name of the stream to get a shard iterator for @param [Hash] shard_description Result from describe stream request @param [String] starting_sequence_number The sequence number to get a shard iterator for, if doesn’t exist, get one for start of shard @return [String] The shard iterator

# File lib/redtrack_local_file_stream.rb, line 73
def get_shard_iterator_from_sequence_number(stream_name,shard_description,starting_sequence_number=nil)
  return self.stream_location(stream_name)
end
stream_has_data(stream_name) click to toggle source

Whether or not the stream has data

@param [String] stream_name The name of the stream @return [Boolean] Whether or not the stream has data

# File lib/redtrack_local_file_stream.rb, line 38
def stream_has_data(stream_name)
  # V1 of data streaming - use a local file
  return File.exist?(self.stream_location(stream_name))
end
stream_location(stream_name) click to toggle source

Get Location of the stream

@param [String] stream_name The name of the stream @return [String] Url/file location for the stream

# File lib/redtrack_local_file_stream.rb, line 29
def stream_location(stream_name)
  # V1 of data streaming - use a local file
  return "log/#{stream_name}"
end
stream_name(redshift_table) click to toggle source

Name of the stream in the data broker (This is a Kinesis stream name)

@param [String] redshift_table Name of the redshift table @return [String] Name of the stream in Kinesis

# File lib/redtrack_local_file_stream.rb, line 120
def stream_name(redshift_table)
  result= @options[:redshift_cluster_name] + '.' + @options[:redshift_dbname] + ".#{redshift_table}"
  return result
end
stream_read_from_shard_iterator_into_files(shard_iterator, files, options={}) click to toggle source

Ream from kinesis shard into a file

@param [String] shard_iterator The shard iterator to start reading from - result of get_shard_iterator- docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html @param [String] files Array of files to read into @param [Hash] options Optional. Can specify :max_records, :max_requests, :max_consecutive_requests_without_data, :backoff_no_data @return [Hash] Hash of # of records read and the sequence number of the last read record, number of records, and shard iterator

# File lib/redtrack_local_file_stream.rb, line 83
def stream_read_from_shard_iterator_into_files(shard_iterator, files, options={})

  stream_file_name = shard_iterator

  records = 0
  num_files = files.length

  fake_sequence_number = Time.now.to_i

  if File.exist?(stream_file_name)
    FileUtils.mv(stream_file_name, "#{stream_file_name}.#{fake_sequence_number}")

    stream_file = File.open("#{stream_file_name}.#{fake_sequence_number}",'r')
    while(line = stream_file.gets) != nil
      files[records % num_files].puts line + "\n"
      records += 1
    end

    result = {
      :starting_sequence_number => fake_sequence_number,
      :ending_sequence_number => fake_sequence_number,
      :records => records
    }
  else
    result = {
      :records => 0,
      :starting_sequence_number => '',
      :ending_sequence_number => ''
    }
  end
  return result
end
stream_write(stream_name,data_string,partition_key=nil) click to toggle source

Write data to a stream

@param [String] stream_name The name of the stream @param [String] data_string String of data to write @param [String] partition_key Ignored @return [Boolean] True - the write to the stream succeeded

# File lib/redtrack_local_file_stream.rb, line 49
def stream_write(stream_name,data_string,partition_key=nil)

  # V1 of data streaming - use a local file: open, write, close
  stream=File.open(self.stream_location(stream_name),"a")
  stream.puts data_string + "\n"
  stream.close
  return true
end