class Polipus::QueueOverflow::CassandraQueue
Attributes
CassandraQueue
wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.
There is no superclass here but I’ve in mind the interface implicitly defined by Polipus::QueueOverflow::DevNullQueue that, more or less has:
def initialize def length def empty? def clear def push(_data) def pop(_ = false)
Taking some data from our backend.production.*****.com/polipus I found:
mongos> db.getCollectionNames() [
"data-com-companies", "data_com_companies", "googleplus", "linkedin", "linkedin-companies", "linkedin_companies_parsed", "linkedin_jobs", "linkedin_jobs_parsed", "linkedin_pages_errors", "polipus_q_overflow_data-com-companies_queue_overflow", "polipus_q_overflow_data_com_companies_queue_overflow", "polipus_q_overflow_googleplus_queue_overflow", "polipus_q_overflow_linkedin-companies_queue_overflow", "polipus_q_overflow_linkedin_jobs_queue_overflow", "polipus_q_overflow_linkedin_jobs_queue_overflow_old", "polipus_q_overflow_linkedin_refresh_queue_overflow", "system.indexes"
]
mongos> db.getCollection(“polipus_q_overflow_linkedin_jobs_queue_overflow”).find().limit(1) {
"_id" : ObjectId("54506b98e3d55b20c40b32d3"), "payload" : "{\"url\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=7&trk=jserp_pagination_next\",\"depth\":6,\"referer\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=6&trk=jserp_pagination_6\",\"fetched\":false}"
}
mongos> db.polipus_q_overflow_linkedin_refresh_queue_overflow.find().limit(10) {
"_id" : ObjectId("544072b6e3d55b0db7000001"), "payload" : "{\"url\":\"http://www.linkedin.com/in/*****\",\"depth\":0,\"fetched\":false}"
}
We also assume this MonkeyPatch: Polipus::QueueOverflow.cassandra_queue(namespace, options = {}) that returns instances of this class.
CassandraQueue
wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.
There is no superclass here but I’ve in mind the interface implicitly defined by Polipus::QueueOverflow::DevNullQueue that, more or less has:
def initialize def length def empty? def clear def push(_data) def pop(_ = false)
Taking some data from our backend.production.*****.com/polipus I found:
mongos> db.getCollectionNames() [
"data-com-companies", "data_com_companies", "googleplus", "linkedin", "linkedin-companies", "linkedin_companies_parsed", "linkedin_jobs", "linkedin_jobs_parsed", "linkedin_pages_errors", "polipus_q_overflow_data-com-companies_queue_overflow", "polipus_q_overflow_data_com_companies_queue_overflow", "polipus_q_overflow_googleplus_queue_overflow", "polipus_q_overflow_linkedin-companies_queue_overflow", "polipus_q_overflow_linkedin_jobs_queue_overflow", "polipus_q_overflow_linkedin_jobs_queue_overflow_old", "polipus_q_overflow_linkedin_refresh_queue_overflow", "system.indexes"
]
mongos> db.getCollection(“polipus_q_overflow_linkedin_jobs_queue_overflow”).find().limit(1) {
"_id" : ObjectId("54506b98e3d55b20c40b32d3"), "payload" : "{\"url\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=7&trk=jserp_pagination_next\",\"depth\":6,\"referer\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=6&trk=jserp_pagination_6\",\"fetched\":false}"
}
mongos> db.polipus_q_overflow_linkedin_refresh_queue_overflow.find().limit(10) {
"_id" : ObjectId("544072b6e3d55b0db7000001"), "payload" : "{\"url\":\"http://www.linkedin.com/in/*****\",\"depth\":0,\"fetched\":false}"
}
We also assume this MonkeyPatch: Polipus::QueueOverflow.cassandra_queue(namespace, options = {}) that returns instances of this class.
CassandraQueue
wants to persists documents (please, still ignore the jargon inherited from Mongo) like the following JSON-ish entry.
There is no superclass here but I’ve in mind the interface implicitly defined by Polipus::QueueOverflow::DevNullQueue that, more or less has:
def initialize def length def empty? def clear def push(_data) def pop(_ = false)
Taking some data from our backend.production.*****.com/polipus I found:
mongos> db.getCollectionNames() [
"data-com-companies", "data_com_companies", "googleplus", "linkedin", "linkedin-companies", "linkedin_companies_parsed", "linkedin_jobs", "linkedin_jobs_parsed", "linkedin_pages_errors", "polipus_q_overflow_data-com-companies_queue_overflow", "polipus_q_overflow_data_com_companies_queue_overflow", "polipus_q_overflow_googleplus_queue_overflow", "polipus_q_overflow_linkedin-companies_queue_overflow", "polipus_q_overflow_linkedin_jobs_queue_overflow", "polipus_q_overflow_linkedin_jobs_queue_overflow_old", "polipus_q_overflow_linkedin_refresh_queue_overflow", "system.indexes"
]
mongos> db.getCollection(“polipus_q_overflow_linkedin_jobs_queue_overflow”).find().limit(1) {
"_id" : ObjectId("54506b98e3d55b20c40b32d3"), "payload" : "{\"url\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=7&trk=jserp_pagination_next\",\"depth\":6,\"referer\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=6&trk=jserp_pagination_6\",\"fetched\":false}"
}
mongos> db.polipus_q_overflow_linkedin_refresh_queue_overflow.find().limit(10) {
"_id" : ObjectId("544072b6e3d55b0db7000001"), "payload" : "{\"url\":\"http://www.linkedin.com/in/*****\",\"depth\":0,\"fetched\":false}"
}
We also assume this MonkeyPatch: Polipus::QueueOverflow.cassandra_queue(namespace, options = {}) that returns instances of this class.
Public Class Methods
There is a validation enforced to ‘:keyspace` and `:table` because Cassandra is not happy when a keyspace or a table name contains an hyphen.
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 67 def initialize(options = {}) raise ArgumentError unless options_are_valid?(options) @cluster = options[:cluster] @keyspace = options[:keyspace].gsub("-", "_") @table = options[:table].gsub("-", "_") @semaphore = Mutex.new @options = options @timeuuid_generator = Cassandra::Uuid::Generator.new @logger = @options[:logger] ||= Logger.new(STDOUT).tap { |l| l.level = Logger::INFO } end
Public Instance Methods
Clear is a fancy name for a DROP TABLE IF EXISTS <table_>.
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 100 def clear table_ = [keyspace, table].compact.join '.' statement = "DROP TABLE IF EXISTS #{table_} ;" session.execute(statement) end
Return true if the table has no rows. This is achieved with a ‘SELECT WITH LIMIT 1’ query.
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 95 def empty? return get.first.nil? end
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 218 def keyspace!(replication = nil, durable_writes = true) replication ||= "{'class': 'SimpleStrategy', 'replication_factor': '3'}" statement = "CREATE KEYSPACE IF NOT EXISTS #{keyspace} WITH replication = #{replication} AND durable_writes = #{durable_writes};" cluster.connect.execute(statement) end
Length aka Size aka Count is supported in Cassandra… like your POSQL you can COUNT.
SELECT COUNT (*) FROM keyspace.table_name;
TBH I’m not sure if being “defensive” and returning 0/nil in case the results is_empty? … I’m leaving (now) the code simple and noisy if something went wrong in the COUNT.
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 86 def length table_ = [keyspace, table].compact.join '.' statement = "SELECT COUNT (*) FROM #{table_} ;" result = session.execute(statement) result.first['count'] end
Pop removes ‘n’ entries from the overflow table (treated as a queue) and returns a paged result. results.class #=> Cassandra::Results::Paged
Polipus
is expecting a String, that will be JSONparsed with the purpose to build a
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 151 def pop(n = 1) # A recap: pop should remove oldest N messages and return to the caller. # # Let's see how this queue is implemented. # In redis, messages are LPUSH-ed: # # 4 - 3 - 2 - 1 --> REDIS # 4 - 3 - 2 --> REDIS # 4 - 3 --> REDIS # 4 --> REDIS # # Then, in the fast_dequeue, are RPOP-ped: # # REDIS --> 1 # REDIS --> 2 - 1 # REDIS --> 3 - 2 - 1 # REDIS --> 4 - 3 - 2 - 1 # # Then, are received in this order: # [1] -> TimeUUID(1) = ... # [2] -> TimeUUID(1) = ... # [3] -> TimeUUID(1) = ... # [4] -> TimeUUID(1) = ... # # As you can see below, are ORDER BY (created_at ASC)... that means # "olders first". When using 'LIMIT n' in a query, you get the 'n' # olders entries. # # cqlsh> SELECT * FROM polipus_queue_overflow_linkedin.linkedin_overflow ; # # queue_name | created_at | payload # ---------------------------------+--------------------------------------+--------- # polipus_queue_overflow_linkedin | 4632d49c-1c04-11e5-844b-0b314c777502 | "1" # polipus_queue_overflow_linkedin | 46339f8a-1c04-11e5-844b-0b314c777502 | "2" # polipus_queue_overflow_linkedin | 46349962-1c04-11e5-844b-0b314c777502 | "3" # polipus_queue_overflow_linkedin | 46351860-1c04-11e5-844b-0b314c777502 | "4" # # (4 rows) # cqlsh> SELECT * FROM polipus_queue_overflow_linkedin.linkedin_overflow LIMIT 1; # # queue_name | created_at | payload # ---------------------------------+--------------------------------------+--------- # polipus_queue_overflow_linkedin | 4632d49c-1c04-11e5-844b-0b314c777502 | "1" # # (1 rows) # table_ = [keyspace, table].compact.join '.' results = get(n) results.each do |entry| statement = "DELETE FROM #{table_} WHERE queue_name = '#{entry['queue_name']}' AND created_at = #{entry['created_at']} ;" session.execute(statement) end # Let's rispect the API as expected by Polipus. # Otherwise the execute returns a Cassandra::Results::Paged if !results.nil? && results.respond_to?(:count) && results.count == 1 return results.first['payload'] end return results end
push is your the “write into Cassandra” method.
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 107 def push(data) return nil if data.nil? obj = MultiJson.decode(data) table_ = [keyspace, table].compact.join('.') queue_name = @keyspace created_at = @timeuuid_generator.now begin @semaphore.synchronize do if obj.has_key?('payload') && !obj['payload'].empty? payload = MultiJson.encode(obj['payload']) else payload = nil end column_names = %w[ queue_name created_at payload ] values_placeholders = column_names.map{|_| '?'}.join(',') statement = "INSERT INTO #{table_} ( #{column_names.join(',')} ) VALUES (#{values_placeholders});" session.execute( session.prepare(statement), arguments: [ queue_name, created_at, payload ]) end rescue Encoding::UndefinedConversionError puts $!.error_char.dump puts $!.error_char.encoding end @logger.debug { "Writing this entry [#{[queue_name, created_at].to_s}]" } [queue_name, created_at].to_s end
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 224 def session @session ||= @cluster.connect(keyspace) end
Taking a look in the Cassandra KEYSPACE you will found:
cqlsh> DESCRIBE KEYSPACE polipus_queue_overflow_linkedin ;
CREATE KEYSPACE polipus_queue_overflow_linkedin WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’: ‘3’} AND durable_writes = true;
CREATE TABLE polipus_queue_overflow_linkedin.linkedin_overflow (
queue_name text, created_at timeuuid, payload text, PRIMARY KEY (queue_name, created_at)
) WITH CLUSTERING ORDER BY (created_at ASC)
AND bloom_filter_fp_chance = 0.01 AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' AND comment = '' AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'} AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} AND dclocal_read_repair_chance = 0.1 AND default_time_to_live = 0 AND gc_grace_seconds = 864000 AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair_chance = 0.0 AND speculative_retry = '99.0PERCENTILE';
This means that:
-
queue_name is partition key;
-
created_at is clustering key;
With sample data:
cqlsh> SELECT * FROM polipus_queue_overflow_linkedin.linkedin_overflow LIMIT 1 ;
queue_name | created_at | payload
———————————--------------------------------------
———————————————————————————+
polipus_queue_overflow_linkedin | de17ece6-1e5e-11e5-b997-47a87c40c422 | "{\"url\":\"http://www.linkedin.com/in/foobar\",\"depth\":0,\"fetched\":false}"
(1 rows) cqlsh>
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 269 def table!(properties = nil) table_ = [keyspace, table].compact.join '.' def_ = "CREATE TABLE IF NOT EXISTS #{table_} ( queue_name TEXT, created_at TIMEUUID, payload TEXT, PRIMARY KEY (queue_name, created_at) )" props = Array(properties).join(' AND ') statement = props.empty? ? "#{def_};" : "#{def_} WITH #{props};" session.execute(statement) end
Private Instance Methods
results.class => Cassandra::Results::Paged
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 294 def get(limit = 1) # coerce to int if a TrueClass/FalseClass is given. limit = 1 if [true, false].include?(limit) raise ArgumentError.new("Invalid limit value: must be an INTEGER greater than 1 (got #{limit.inspect}).") unless limit_is_valid?(limit) table_ = [keyspace, table].compact.join '.' statement = "SELECT queue_name, created_at, payload FROM #{table_} LIMIT #{limit.to_i} ;" @semaphore.synchronize do return session.execute(session.prepare(statement), arguments: []) end end
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 289 def limit_is_valid?(limit) !limit.nil? && limit.respond_to?(:to_i) && limit.to_i > 0 end
# File lib/polipus-cassandra/queue_overflow/cassandra_queue.rb, line 285 def options_are_valid?(options) options.has_key?(:cluster) && options.has_key?(:keyspace) && options.has_key?(:table) end