class Capacitor::CommandsFetcher
Public Instance Methods
block_on_incoming_signal_list()
click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 25 def block_on_incoming_signal_list false until incoming_signal_list flush_incoming_signal_list end
blocking_timeout()
click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 9 def blocking_timeout 60 end
fetch()
click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 5 def fetch new.retrieve_batch end
flush_batch()
click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 72 def flush_batch # Safely processed now, kill the batch in redis redis.del "processing_hash", "retry_hash" end
flush_incoming_signal_list()
click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 30 def flush_incoming_signal_list redis.del "incoming_signal_list" end
flush_retried_batch()
click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 77 def flush_retried_batch if redis.hlen("retry_hash") > 0 failure = 'failure:' + Time.new.utc.to_f.to_s redis.rename "retry_hash", failure redis.lpush "failed_hash_keys", failure logger.error "retry_hash moved to #{failure}" end end
incoming_signal_list()
click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 21 def incoming_signal_list redis.blpop "incoming_signal_list", blocking_timeout end
logger()
click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 17 def logger Capacitor.logger end
redis()
click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 13 def redis Capacitor.redis end
retrieve_batch()
click to toggle source
When things are working well :incoming_hash -> :processing_hash -> flush()
If a batch fails once :processing_hash -> :retry_hash -> flush()
If a batch fails again :retry_hash -> :failed_hash_keys -> flush() then start over with :incoming_hash
# File lib/capacitor/commands_fetcher.rb, line 67 def retrieve_batch flush_retried_batch return retrieve_existing_batch || retrieve_current_batch end
retrieve_current_batch()
click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 45 def retrieve_current_batch begin result = redis.rename "incoming_hash", "processing_hash" rescue Exception => e # This means we got a signal without getting data, which is # probably okay due to the harmless race condition, but might # warrant investigation later, so let's log it and move on. logger.warn "empty incoming_hash in retrieve_batch" return {} end redis.hgetall "processing_hash" end
retrieve_existing_batch()
click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 34 def retrieve_existing_batch batch = redis.hgetall "processing_hash" if !batch.empty? redis.rename "processing_hash", "retry_hash" logger.error "processing_hash moved to retry_hash" batch else nil end end