class Meeseeker::BlockFollowerJob

Constants

MAX_VOP_RETRY

Public Instance Methods

perform(options = {}) click to toggle source
# File lib/meeseeker/block_follower_job.rb, line 5
def perform(options = {})
  chain = (options[:chain] || 'hive').to_sym
  url = Meeseeker.default_url(chain)
  block_api = Meeseeker.block_api_class(chain).new(url: url)
  redis = Meeseeker.redis
  last_key_prefix = nil
  trx_index = 0
  current_block_num = nil
  block_transactions = []
  chain_key_prefix = chain.to_s if !!options[:chain]
  chain_key_prefix ||= Meeseeker.default_chain_key_prefix
  
  stream_operations(options) do |op, trx_id, block_num|
    begin
      current_key_prefix = "#{chain_key_prefix}:#{block_num}:#{trx_id}"
      
      if current_key_prefix == last_key_prefix
        trx_index += 1
      else
        if !!last_key_prefix
          _, b, t = last_key_prefix.split(':')
          transaction_payload = {
            block_num: b.to_i,
            transaction_id: t,
            transaction_num: block_transactions.size
          }
          
          block_transactions << trx_id unless trx_id == VIRTUAL_TRX_ID
          redis.publish("#{chain_key_prefix}:transaction", transaction_payload.to_json)
        end
        last_key_prefix = "#{chain_key_prefix}:#{block_num}:#{trx_id}"
        trx_index = 0
      end
      
      op_type = if op.type.end_with? '_operation'
        op.type.split('_')[0..-2].join('_')
      else
        op.type
      end
      
      key = "#{current_key_prefix}:#{trx_index}:#{op_type}"
      puts key
    end
    
    unless Meeseeker.max_keys == -1
      while redis.keys("#{chain_key_prefix}:*").size > Meeseeker.max_keys
        sleep Meeseeker::BLOCK_INTERVAL
      end
    end
    
    redis.set(key, op.to_json)
    redis.expire(key, Meeseeker.expire_keys) unless Meeseeker.expire_keys == -1
    
    if current_block_num != block_num
      block_transactions = []
      block_payload = {
        block_num: block_num
      }
      
      if Meeseeker.include_block_header
        catch :block_header do
          block_api.get_block_header(block_num: block_num) do |result|
            if result.nil? || result.header.nil?
              puts "Node returned empty result for block_header on block_num: #{block_num} (rate limiting?).  Retrying ..."
              sleep Meeseeker::BLOCK_INTERVAL
              throw :block_header
            end
            
            block_payload.merge!(result.header.to_h)
          end
        end
      end
      
      redis.set(chain_key_prefix + LAST_BLOCK_NUM_KEY_SUFFIX, block_num)
      redis.publish("#{chain_key_prefix}:block", block_payload.to_json)
      current_block_num = block_num
    end
    
    redis.publish("#{chain_key_prefix}:op:#{op_type}", {key: key}.to_json)
    
    if Meeseeker.publish_op_custom_id
      if %w(custom custom_binary custom_json).include? op_type
        id = (op["value"]["id"] rescue nil).to_s
        
        if id.size > 0
          redis.publish("#{chain_key_prefix}:op:#{op_type}:#{id}", {key: key}.to_json)
        end
      end
    end
  end
end

Private Instance Methods

stream_operations(options = {}) { |op, transaction_ids, n| ... } click to toggle source
# File lib/meeseeker/block_follower_job.rb, line 97
def stream_operations(options = {}, &block)
  chain = (options[:chain] || 'hive').to_sym
  redis = Meeseeker.redis
  chain_key_prefix = chain.to_s if !!options[:chain]
  chain_key_prefix ||= Meeseeker.chain_key_prefix
  last_block_num = nil
  mode = options.delete(:mode) || Meeseeker.stream_mode
  options[:include_virtual] ||= Meeseeker.include_virtual
  
  if !!options[:at_block_num]
    last_block_num = options[:at_block_num].to_i
  else
    url = Meeseeker.default_url(chain)
    database_api = Meeseeker.database_api_class(chain).new(url: url)
    last_block_num = redis.get(chain_key_prefix + LAST_BLOCK_NUM_KEY_SUFFIX).to_i + 1
    
    block_num = catch :dynamic_global_properties do
      database_api.get_dynamic_global_properties do |dgpo|
        throw :dynamic_global_properties if dgpo.nil?
        
        case mode
        when :head then dgpo.head_block_number
        when :irreversible then dgpo.last_irreversible_block_num
        else; abort "Unknown stream mode: #{mode}"
        end
      end
    end
      
    if Meeseeker.expire_keys == -1
      last_block_num = [last_block_num, block_num].max
      
      puts "Sync from: #{last_block_num}"
    elsif block_num - last_block_num > Meeseeker.expire_keys / 3
      last_block_num = block_num
      
      puts 'Starting new sync.'
    else
      behind_sec = block_num - last_block_num
      behind_sec *= 3.0
      
      puts "Resuming from #{behind_sec / 60} minutes ago ..."
    end
  end
  
  begin
    url = Meeseeker.default_url(chain)
    stream_options = {url: url, mode: mode}
    options = options.merge(at_block_num: last_block_num)
    condenser_api = nil
    
    Meeseeker.stream_class.new(stream_options).tap do |stream|
      puts "Stream begin: #{stream_options.to_json}; #{options.to_json}"
      
      # Prior to v0.0.4, we only streamed operations with stream.operations.
      
      # After v0.0.5, we stream blocks so that we can get block.timestamp,
      # to embed it into op values.  This should also reduce streaming
      # overhead since we no longer stream block_headers inder the hood.
      
      loop do
        begin
          stream.blocks(options) do |b, n|
            redo if b.nil?
            
            b.transactions.each_with_index do |transaction, index|
              transaction.operations.each do |op|
                op = op.merge(timestamp: b.timestamp)
                
                yield op, b.transaction_ids[index], n
              end
            end
            
            next unless !!Meeseeker.include_virtual
            
            retries = 0
            
            # This is where it gets tricky.  Virtual ops sometims don't show up
            # right away, especially if we're streaming on head blocks.  In that
            # situation, we might only need to wait about 1 block.  This loop
            # will likely one execute one iteration, but we have fallback logic
            # in case there are complications.
            #
            # See: https://developers.steem.io/tutorials-recipes/virtual-operations-when-streaming-blockchain-transactions
            
            loop do
              # TODO (HF23) Switch to account_history_api.enum_virtual_ops if supported.
              url = Meeseeker.default_url(chain)
              condenser_api ||= Meeseeker.condenser_api_class(chain).new(url: url)
              condenser_api.get_ops_in_block(n, true) do |vops|
                if vops.nil?
                  puts "Node returned empty result for get_ops_in_block on block_num: #{n} (rate limiting?).  Retrying ..."
                  vops = []
                end
                
                if vops.empty? && mode != :head
                  # Usually, we just need to slow down to allow virtual ops to
                  # show up after a short delay.  Adding this delay doesn't
                  # impact overall performance because steem-ruby will batch
                  # when block streams fall behind.
                  
                  if retries < MAX_VOP_RETRY
                    retries = retries + 1
                    condenser_api = nil
                    sleep Meeseeker::BLOCK_INTERVAL * retries
                    
                    redo
                  end
                  
                  puts "Gave up retrying virtual ops lookup on block #{n}"
                  
                  break
                end
                
                if retries > 0
                  puts "Found virtual ops for block #{n} aftere #{retries} retrie(s)"
                end
                
                vops.each do |vop|
                  normalized_op = Hashie::Mash.new(
                    type: vop.op[0],
                    value: vop.op[1], 
                    timestamp: vop.timestamp
                  )
                  
                  yield normalized_op, vop.trx_id, vop.block
                end
              end
              
              break
            end
          end
          
          break
        rescue => e
          raise e unless e.to_s.include? 'Request Entity Too Large'
          
          # We need to tell steem-ruby to avoid json-rpc-batch on this
          # node.
          
          Meeseeker.block_api_class(chain).const_set 'MAX_RANGE_SIZE', 1
          sleep Meeseeker::BLOCK_INTERVAL
          redo
        end
      end
    end
  end
end