class RedTrack::Client

Constants

TAG

Public Class Methods

new(options) click to toggle source

Constructor for the client - initialize instance variables

@param [Hash] options Options to the client - see README.md

# File lib/redtrack_client.rb, line 22
def initialize(options)

  # Create logger and add to options (passed to other objects)
  @logger = Logger.new(STDOUT)
  options[:logger] = @logger

  # Create the appropriate broker
  if options[:kinesis_enabled] == true
    @logger.debug("#{TAG} Kinesis enabled. create KinesisClient")
    @broker = RedTrack::KinesisClient.new(options)
  else
    @logger.debug("#{TAG} Kinesis disabled. create FileClient")
    @broker = RedTrack::FileClient.new(options)
  end

  # Bind to the interface for checking data types
  @data_types = RedTrack::DataTypes.new(options)
  @valid_data_types = @data_types.valid_data_types

  aws_options = {
      :access_key_id => options[:access_key_id],
      :secret_access_key => options[:secret_access_key],
      :region => options[:region]
  }
  AWS.config(aws_options)

  @options = options
end

Public Instance Methods

create_kinesis_loads_table() click to toggle source

@return [String] Executes query against redshift and returns the result

# File lib/redtrack_client.rb, line 222
def create_kinesis_loads_table
  schema= {
    :columns => {
      :stream_name =>                   { :type => 'varchar(64)' },
      :shard_id =>                      { :type => 'varchar(64)' },
      :table_name =>                    { :type => 'varchar(64)' },
      :starting_sequence_number =>      { :type => 'varchar(64)' },
      :ending_sequence_number =>        { :type => 'varchar(64)' },
      :load_timestamp =>                { :type => 'timestamp', :constraint => 'not null' }
    },
    :sortkey => 'load_timestamp'
  }

  return create_table_from_schema('kinesis_loads',true,schema)
end
create_kinesis_stream_for_table(table,shard_count=1) click to toggle source

Create a kinesis stream for the table - use configuration

@param [String] table The name of the table @param [integer] shard_count The number of shards in the stream

# File lib/redtrack_client.rb, line 242
def create_kinesis_stream_for_table(table,shard_count=1)
  result = false
  if @options[:kinesis_enabled]
    result = @broker.create_kinesis_stream_for_table(table,shard_count)
  else
    @logger.warn("#{TAG} Kinesis is not enabled. Nothing done.")
  end
  return result
end
create_table_from_schema(table,exec=true,schema=nil) click to toggle source

Returns a SQL statement for creating a Redshift per the defined schema above

@param [String] table The name of the table @param [Boolean] exec Whether to execute the statement @param [Hash] table_schema The table schema to use - if not provided, get from passed schema @return [String] Returns the create table string

# File lib/redtrack_client.rb, line 182
def create_table_from_schema(table,exec=true,schema=nil)

  if schema == nil
    schema = get_table_schema(table)
    if !schema
      @logger.warn("#{TAG} No schema exists for table #{table}")
      return false
    end
  end

  query = "create table #{table} (\n"
  schema[:columns].each_with_index do |(column_name,column),index|

    query += "#{column_name} " + column[:type]
    if column[:constraint] != nil
      query += " " + column[:constraint]
    end
    if index !=  schema[:columns].size - 1
      query += ","
    end
    query += "\n"
  end
  query += ")"
  if schema[:sortkey] != nil
    query += "\nsortkey(" + schema[:sortkey] + ");\n"
  else
    query += ";\n"
  end

  if exec
    conn = new_redshift_connection()
    result = conn.exec(query)
  else
    result = query
  end

  return result
end
get_table_schema(table) click to toggle source

Gets a schema hash object for a specific table

@param [String] table The name of the redshift table @return [Hash] Hash object containing the column definitions

# File lib/redtrack_client.rb, line 159
def get_table_schema(table)
  if (@options[:redshift_schema] == nil)
    raise 'Must pass :redshift_schema as option when creating RedTrack client'
  end

  schema = @options[:redshift_schema]

  if schema[table.to_sym]
    result = schema[table.to_sym]
  elsif schema["#{table}"]
    result = schema["#{table}"]
  end

  return result
end
new_loader(loader_options={}) click to toggle source

Create a new loader client

@param [Hash] loader_options The options to pass to the loader @return [RedTrack::Loader] The loader client

# File lib/redtrack_client.rb, line 56
def new_loader(loader_options={})
  merged_options = merge_options(loader_options)

  if @redshift_conn == nil
    @redshift_conn = new_redshift_connection(loader_options)
  end

  return RedTrack::Loader.new(merged_options,@broker,@redshift_conn)
end
new_redshift_connection(connection_options={}) click to toggle source

Create a new redshift connection

@param [Hash] connection_options A set of options to pass to PG.connect. Uses options passed to redtrack client by default @return [PG::Connection] Postgres client connection

# File lib/redtrack_client.rb, line 70
def new_redshift_connection(connection_options={})
  merged_options = merge_options(connection_options)

  @redshift_conn = PG.connect(
    :host => merged_options[:redshift_host],
    :port => merged_options[:redshift_port],
    :dbname => merged_options[:redshift_dbname],
    :user => merged_options[:redshift_user],
    :password => merged_options[:redshift_password])

  return @redshift_conn
end
write(table,data,partition_key=nil) click to toggle source

Check the data to ensure it conforms to the table schema and write to the databroker for the table. Determines which shard to write to randomly

@param [String] table The name of the redshift table to write to @param [Hash] data hash containing data to write to the table. Key is column name @param [String] partition_key optional, used to determine which kinesis shard to write the data to @return [Boolean] Whether or not the write succeeded

# File lib/redtrack_client.rb, line 90
def write(table,data,partition_key=nil)

  ## Get table schema...
  schema = get_table_schema(table)

  if schema == nil
    raise "Scheme does not exist for table name ='#{table}'"
  end

  ## Ensure that the keys in the passed data are symbols (this is what's expected)
  data.keys.each do |key|
    if(key.is_a?(Symbol) == false)
      raise "Data key #{key} is not a symbol!"
      # TODO: CONVERT string keys to symbols instead of raising
    end
  end

  intersection = schema[:columns].keys & data.keys

  ## Validate no data keys are passed that are not in table schema
  data.keys.each do |key|
    if(intersection.include?(key) == false)
      raise "Data key #{key} is not in schema for #{table} table!!"
    end
  end

  ## Validate that columns are not null
  schema[:columns].each do |column_name,column|
    if(column.keys.include?(:constraint) == true && column[:constraint] == "not null" && intersection.include?(column_name) == false)
      raise "Column #{column_name} is missing from passed data"
    end
  end

  ## Validate column types
  schema[:columns].each do |column_name,column|
    if(intersection.include?(column_name) == true)

      value = data[column_name.to_sym]
      column_type = column[:type]

      if column_type["("] != nil
        type_name = column_type[/(.*)\(.*/,1]
      else
        type_name = column_type
      end

      if @valid_data_types.include? type_name
        data[column_name.to_sym] = @data_types.send("check_#{type_name}".to_sym,value,column_type,column_name)
      else
        raise "Invalid data type #{type_name}. Valid types [#{@valid_data_types.join(",")}]"
      end
    end
  end

  ## Serialize as json, we load the data as JSON into redshift
  data_string=data.to_json

  ## Write the serialized data string to the broker
  partition_key = partition_key || rand(100).to_s
  stream_name = @broker.stream_name(table)
  result = @broker.stream_write(stream_name, data_string, partition_key)

  return result
end

Private Instance Methods

is_numeric(value) click to toggle source

Determine whether the typed value is a legit number, (eg, string)

@param [Numeric] value The value to check as valid numeric @return [Boolean] Whether or not the value is a numeric

# File lib/redtrack_client.rb, line 269
def is_numeric(value)
  Float(value) != nil rescue false
end
is_redshift_timestamp(value) click to toggle source

Determine whether the typed value is a timestamp as defined by redshift. This is more restrictive than ruby parsing b/c of redshift See: docs.aws.amazon.com/redshift/latest/dg/r_DATEFORMAT_and_TIMEFORMAT_strings.html

@param [String] value The value to check as a valid timestamp: “YYYY-MM-DD HH:mm:ss” is only accepted format @return [Boolean] Whether or not the value is a timestamp as accepted by redshift

# File lib/redtrack_client.rb, line 278
def is_redshift_timestamp(value)
  if value.is_a?(String) && value[/\A\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\z/] != nil
    return true
  end
  return false
end
merge_options(options) click to toggle source

Merge options between passed options and the default options in RedTrack client

@param [Hash] options The set of options passed

# File lib/redtrack_client.rb, line 257
def merge_options(options)
  merged_options=@options
  options.each do |passed_option_key,passed_option_value|
    merged_options[passed_option_key] = passed_option_value
  end
  return merged_options
end