class Meeseeker::SteemEngine::FollowerJob
Public Class Methods
new(options = {})
click to toggle source
# File lib/meeseeker/steem_engine/follower_job.rb, line 5 def initialize(options = {}) @chain_key_prefix = options[:chain_key_prefix] || Meeseeker::STEEM_ENGINE_CHAIN_KEY_PREFIX end
Public Instance Methods
chain_name()
click to toggle source
# File lib/meeseeker/steem_engine/follower_job.rb, line 9 def chain_name @chain_key_prefix.split('_').map(&:capitalize).join(' ') end
perform(options = {})
click to toggle source
# File lib/meeseeker/steem_engine/follower_job.rb, line 13 def perform(options = {}) redis = Meeseeker.redis last_key_prefix = nil trx_index = 0 current_block_num = nil block_transactions = [] stream_transactions(options) do |data, block| transaction = data[:transaction] virtual = !!data[:virtual] begin trx_id = transaction['transactionId'].to_s.split('-').first block_num = block['blockNumber'] current_key_prefix = "#{@chain_key_prefix}:#{block_num}:#{trx_id}" contract = transaction['contract'] action = transaction['action'] if current_key_prefix == last_key_prefix trx_index += 1 else if !!last_key_prefix _, b, t = last_key_prefix.split(':') transaction_payload = { block_num: b.to_i, transaction_id: t, transaction_num: block_transactions.size } block_transactions << trx_id trx_pub_key = if !!virtual "#{@chain_key_prefix}:virtual_transaction" else "#{@chain_key_prefix}:transaction" end redis.publish(trx_pub_key, transaction_payload.to_json) end last_key_prefix = "#{@chain_key_prefix}:#{block_num}:#{trx_id}" trx_index = 0 end key = "#{current_key_prefix}:#{trx_index}:#{contract}:#{action}" puts key end unless Meeseeker.max_keys == -1 while redis.keys("#{@chain_key_prefix}:*").size > Meeseeker.max_keys sleep Meeseeker::BLOCK_INTERVAL end end redis.set(key, transaction.to_json) redis.expire(key, Meeseeker.expire_keys) unless Meeseeker.expire_keys == -1 if current_block_num != block_num block_transactions = [] block_payload = { block_num: block_num } redis.set(@chain_key_prefix + Meeseeker::LAST_STEEM_ENGINE_BLOCK_NUM_KEY_SUFFIX, block_num) redis.publish("#{@chain_key_prefix}:block", block_payload.to_json) current_block_num = block_num end redis.publish("#{@chain_key_prefix}:#{contract}", {key: key}.to_json) redis.publish("#{@chain_key_prefix}:#{contract}:#{action}", {key: key}.to_json) end end
Private Instance Methods
agent()
click to toggle source
# File lib/meeseeker/steem_engine/follower_job.rb, line 86 def agent @agent ||= case @chain_key_prefix when 'steem_engine' then Agent.new when 'hive_engine' then Meeseeker::HiveEngine::Agent.new end end
agent_reset()
click to toggle source
# File lib/meeseeker/steem_engine/follower_job.rb, line 93 def agent_reset return if @agent.nil? @agent.shutdown @agent = nil end
reset_retry_interval()
click to toggle source
# File lib/meeseeker/steem_engine/follower_job.rb, line 107 def reset_retry_interval @retry_interval = nil end
retry_interval()
click to toggle source
# File lib/meeseeker/steem_engine/follower_job.rb, line 100 def retry_interval @retry_interval ||= 0.1 @retry_interval *= 2 [@retry_interval, MAX_RETRY_INTERVAL].min end
stream_transactions(options = {}) { |{transaction: merge(timestamp: block)}, block| ... }
click to toggle source
# File lib/meeseeker/steem_engine/follower_job.rb, line 111 def stream_transactions(options = {}, &block) redis = Meeseeker.redis last_block_num = nil until_block_num = options[:until_block_num].to_i if !!options[:at_block_num] last_block_num = options[:at_block_num].to_i else new_sync = false last_block_num = redis.get(@chain_key_prefix + Meeseeker::LAST_STEEM_ENGINE_BLOCK_NUM_KEY_SUFFIX) block_info = agent.latest_block_info block_num = block_info['blockNumber'] last_block = agent.block(block_num) last_block_timestamp = Time.parse(last_block['timestamp'] + 'Z') if last_block_num.nil? new_sync = true last_block_num = block_num else last_block_num = last_block_num.to_i + 1 end if Meeseeker.expire_keys == -1 last_block_num = [last_block_num, block_num].max puts "Sync #{chain_name} from: #{last_block_num}" elsif new_sync || (Time.now.utc - last_block_timestamp > Meeseeker.expire_keys) last_block_num = block_num + 1 puts "Starting new #{chain_name} sync." else puts "Resuming from #{chain_name} block #{last_block_num} ..." end end block_num = last_block_num loop do begin block = agent.block(block_num) reset_retry_interval rescue Net::HTTP::Persistent::Error => e puts "Retrying: #{e}" agent_reset sleep retry_interval redo end if block.nil? sleep Meeseeker::BLOCK_INTERVAL redo end transactions = block['transactions'] transactions.each do |transaction| yield({transaction: transaction.merge(timestamp: block['timestamp'])}, block) end virtual_transactions = block['virtualTransactions'] virtual_transactions.each do |virtual_transaction| _, vtrx_in_block = virtual_transaction['transactionId'].split('-') virtual_transaction = virtual_transaction.merge( timestamp: block['timestamp'], 'transactionId' => "#{Meeseeker::VIRTUAL_TRX_ID}-#{vtrx_in_block}" ) yield({transaction: virtual_transaction, virtual: true}, block) end break if until_block_num != 0 && block_num > until_block_num block_num = block_num + 1 end end