class Polipus::Storage::CassandraStore

Constants

BINARY_FIELDS

Attributes

cluster[RW]

CassandraStore wants to persists documents (please ignore the jargon inherited from MongoDB) like the following JSON-ish entry:

> db.find({})

 {
   "_id" : ObjectId("...."),
   "url" : "https://www.awesome.org/meh",
   "code" : 200,
   "depth" : 0,
   "referer" : "",
   "redirect_to" : "",
   "response_time" : 1313,
   "fetched" : true,
   "user_data" :
     {
       "imported" : false,
       "is_developer" : false,
       "last_modified" : null
     },
    "fetched_at" : 1434977757,
    "error" : "",
    "uuid" : "4ddce293532ea2454356a4210e61c363"
}
keyspace[RW]

CassandraStore wants to persists documents (please ignore the jargon inherited from MongoDB) like the following JSON-ish entry:

> db.find({})

 {
   "_id" : ObjectId("...."),
   "url" : "https://www.awesome.org/meh",
   "code" : 200,
   "depth" : 0,
   "referer" : "",
   "redirect_to" : "",
   "response_time" : 1313,
   "fetched" : true,
   "user_data" :
     {
       "imported" : false,
       "is_developer" : false,
       "last_modified" : null
     },
    "fetched_at" : 1434977757,
    "error" : "",
    "uuid" : "4ddce293532ea2454356a4210e61c363"
}
table[RW]

CassandraStore wants to persists documents (please ignore the jargon inherited from MongoDB) like the following JSON-ish entry:

> db.find({})

 {
   "_id" : ObjectId("...."),
   "url" : "https://www.awesome.org/meh",
   "code" : 200,
   "depth" : 0,
   "referer" : "",
   "redirect_to" : "",
   "response_time" : 1313,
   "fetched" : true,
   "user_data" :
     {
       "imported" : false,
       "is_developer" : false,
       "last_modified" : null
     },
    "fetched_at" : 1434977757,
    "error" : "",
    "uuid" : "4ddce293532ea2454356a4210e61c363"
}

Public Class Methods

new(options = {}) click to toggle source
# File lib/polipus-cassandra/storage/cassandra_store.rb, line 41
def initialize(options = {})
  @cluster = options[:cluster]
  @keyspace = options[:keyspace]
  @table = options[:table]
  @except = options[:except] || []
  @semaphore = Mutex.new
end

Public Instance Methods

add(page) click to toggle source

{

'url'           => @url.to_s,
'headers'       => Marshal.dump(@headers),
'body'          => @body,
'links'         => links.map(&:to_s),
'code'          => @code,
'depth'         => @depth,
'referer'       => @referer.to_s,
'redirect_to'   => @redirect_to.to_s,
'response_time' => @response_time,
'fetched'       => @fetched,
'user_data'     => @user_data.nil? ? {} : @user_data.marshal_dump,
'fetched_at'    => @fetched_at,
'error'         => @error.to_s

}

# File lib/polipus-cassandra/storage/cassandra_store.rb, line 65
def add(page)
  @semaphore.synchronize do
    table_ = [keyspace, table].compact.join '.'
    uuid_ = uuid(page)
    obj = page.to_hash
    Array(@except).each { |e| obj.delete(e.to_s) }

    begin
      BINARY_FIELDS.each do |field|
        obj[field] = obj[field].to_s.encode('UTF-8', {
          invalid: :replace,
          undef: :replace,
          replace: '?' }) if can_be_converted?(obj[field])
        # ec = Encoding::Converter.new("ASCII-8BIT", "UTF-8")
        # obj[field] = ec.convert(obj[field]) if can_be_converted?(obj[field])
        # obj[field] = obj[field].force_encoding('ASCII-8BIT').force_encoding('UTF-8') if can_be_converted?(obj[field])
      end

      json = MultiJson.encode(obj)

      url = obj.fetch('url', nil)
      code = obj.fetch('code', nil)
      depth = obj.fetch('depth', nil)
      referer = obj.fetch('referer', nil)
      redirectto = obj.fetch('redirect_to', nil)
      response_time = obj.fetch('response_time', nil)
      fetched = obj.fetch('fetched', nil)
      error = obj.fetch('error', nil)
      page = Zlib::Deflate.deflate(json)

      if obj.has_key?('user_data') && !obj['user_data'].empty?
        user_data = MultiJson.encode(obj['user_data'])
      else
        user_data = nil
      end

      value = obj.fetch('fetched_at', nil)
      fetched_at = case value
      when Fixnum
        Time.at(value)
      when String
        Time.parse(value)
      else
        nil
      end

      column_names = %w[ uuid url code depth referer redirect_to response_time fetched user_data fetched_at error page ]
      values_placeholders = column_names.map{|_| '?'}.join(',')
      statement = "INSERT INTO #{table_} ( #{column_names.join(',')} ) VALUES (#{values_placeholders});"

      session.execute(
        session.prepare(statement),
        arguments: [
          uuid_,
          url,
          code,
          depth,
          referer,
          redirectto,
          response_time,
          fetched,
          user_data,
          fetched_at,
          error,
          page
        ])

    rescue Encoding::UndefinedConversionError
      puts $!.error_char.dump
      puts $!.error_char.encoding
    end

    uuid_
  end
end
clear() click to toggle source
# File lib/polipus-cassandra/storage/cassandra_store.rb, line 141
def clear
  table_ = [keyspace, table].compact.join '.'
  statement = "DROP TABLE #{table_};"
  session.execute statement
end
count() click to toggle source

TBH I’m not sure if being “defensive” and returning 0/nil in case the results is_empty? … I’m leaving (now) the code simple and noisy if something went wrong in the COUNT.

# File lib/polipus-cassandra/storage/cassandra_store.rb, line 150
def count
  table_ = [keyspace, table].compact.join '.'
  statement = "SELECT COUNT (*) FROM #{table_} ;"
  result = session.execute(statement)
  result.first['count']
end
each() { |data, page| ... } click to toggle source
# File lib/polipus-cassandra/storage/cassandra_store.rb, line 157
def each
  table_ = [keyspace, table].compact.join '.'
  statement = "SELECT * FROM #{table_};"
  session.execute(statement).each do |data|
    page = load_page(data) unless data.nil?
    yield data['uuid'], page
  end
end
exists?(page) click to toggle source
# File lib/polipus-cassandra/storage/cassandra_store.rb, line 166
def exists?(page)
  @semaphore.synchronize do
    table_ = [keyspace, table].compact.join '.'
    statement = "SELECT uuid FROM #{table_} WHERE uuid = ? LIMIT 1;"
    results = session.execute(session.prepare(statement),
                              arguments: [uuid(page)])
    !results.first.nil?
  end
end
get(page) click to toggle source
# File lib/polipus-cassandra/storage/cassandra_store.rb, line 176
def get(page)
  @semaphore.synchronize do
    table_ = [keyspace, table].compact.join '.'
    statement = "SELECT * FROM #{table_} WHERE uuid = ? LIMIT 1;"
    results = session.execute(session.prepare(statement),
                              arguments: [uuid(page)])
    data = results.first
    load_page(data) unless data.nil?
  end
end
keyspace!(replication = nil, durable_writes = true) click to toggle source
# File lib/polipus-cassandra/storage/cassandra_store.rb, line 187
def keyspace!(replication = nil, durable_writes = true)
  replication ||= "{'class': 'SimpleStrategy', 'replication_factor': '3'}"
  statement = "CREATE KEYSPACE IF NOT EXISTS #{keyspace} WITH replication = #{replication} AND durable_writes = #{durable_writes};"
  cluster.connect.execute statement
end
load_page(data) click to toggle source
# File lib/polipus-cassandra/storage/cassandra_store.rb, line 229
def load_page(data)
  json = Zlib::Inflate.inflate(data['page'])
  hash = MultiJson.decode(json)
  page = Page.from_hash(hash)
  page.fetched_at = 0 if page.fetched_at.nil?
  page
end
remove(page) click to toggle source
# File lib/polipus-cassandra/storage/cassandra_store.rb, line 193
def remove(page)
  @semaphore.synchronize do
    table_ = [keyspace, table].compact.join '.'
    statement = "DELETE FROM #{table_} WHERE uuid = ?;"
    session.execute(session.prepare(statement),
                    arguments: [uuid(page)])
    true
  end
end
session() click to toggle source
# File lib/polipus-cassandra/storage/cassandra_store.rb, line 203
def session
  @session ||= @cluster.connect(keyspace)
end
table!(properties = nil) click to toggle source
# File lib/polipus-cassandra/storage/cassandra_store.rb, line 207
def table!(properties = nil)
  table_ = [keyspace, table].compact.join '.'
  def_ = "CREATE TABLE IF NOT EXISTS #{table_}
    (
      uuid TEXT PRIMARY KEY,
      url TEXT,
      code INT,
      depth INT,
      referer TEXT,
      redirect_to TEXT,
      response_time BIGINT,
      fetched BOOLEAN,
      user_data TEXT,
      fetched_at TIMESTAMP,
      error TEXT,
      page BLOB
    )"
  props = properties.to_a.join(' AND ')
  statement = props.empty? ? "#{def_};" : "#{def_} WITH #{props};"
  session.execute statement
end

Private Instance Methods

can_be_converted?(field) click to toggle source
# File lib/polipus-cassandra/storage/cassandra_store.rb, line 239
def can_be_converted?(field)
  !field.nil? && field.is_a?(String) && !field.empty?
end